/* * ============LICENSE_START======================================================= * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.drools.pooling; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.features.DroolsControllerFeatureAPI; import org.onap.policy.drools.features.PolicyControllerFeatureAPI; import org.onap.policy.drools.features.PolicyEngineFeatureAPI; import org.onap.policy.drools.persistence.SystemPersistence; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; import org.onap.policy.drools.util.FeatureEnabledChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Controller/session pooling. Multiple hosts may be launched, all servicing the same * controllers/sessions. When this feature is enabled, the requests are divided across the * different hosts, instead of all running on a single, active host. *

* With each controller, there is an associated DMaaP topic that is used for internal * communication between the different hosts serving the controller. */ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI { private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class); /** * Factory used to create objects. */ private static Factory factory; /** * ID of this host. */ private final String host; /** * Entire set of feature properties, including those specific to various controllers. */ private Properties featProps = null; /** * Maps a controller name to its associated manager. */ private final ConcurrentHashMap ctlr2pool = new ConcurrentHashMap<>(107); /** * Decremented each time a manager enters the Active state. Used by junit tests. */ private final CountDownLatch activeLatch = new CountDownLatch(1); /** * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is * called later. As multiple threads can be active within the methods at the same * time, we must keep this in thread local storage. */ private ThreadLocal offerArgs = new ThreadLocal<>(); /** * */ public PoolingFeature() { super(); this.host = UUID.randomUUID().toString(); } protected static Factory getFactory() { return factory; } /** * Sets the factory to be used to create objects. Used by junit tests. * * @param factory the new factory to be used to create objects */ protected static void setFactory(Factory factory) { PoolingFeature.factory = factory; } public String getHost() { return host; } /** * @return a latch that will be decremented when a manager enters the active state */ protected CountDownLatch getActiveLatch() { return activeLatch; } @Override public int getSequenceNumber() { return 0; } @Override public boolean beforeStart(PolicyEngine engine) { logger.info("initializing " + PoolingProperties.FEATURE_NAME); featProps = factory.getProperties(PoolingProperties.FEATURE_NAME); return false; } /** * Adds the controller and a new pooling manager to {@link #ctlr2pool}. * * @throws PoolingFeatureRtException if an error occurs */ @Override public boolean afterCreate(PolicyController controller) { if (featProps == null) { logger.error("pooling feature properties have not been loaded"); throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties")); } String name = controller.getName(); if (FeatureEnabledChecker.isFeatureEnabled(featProps, name, PoolingProperties.FEATURE_ENABLED)) { try { // get & validate the properties PoolingProperties props = new PoolingProperties(name, featProps); logger.info("pooling enabled for {}", name); ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch)); } catch (PropertyException e) { logger.error("pooling disabled due to exception for {}", name, e); throw new PoolingFeatureRtException(e); } } else { logger.info("pooling disabled for {}", name); } return false; } @Override public boolean beforeStart(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeStart(); return false; }); } @Override public boolean afterStart(PolicyController controller) { return doManager(controller, mgr -> { mgr.afterStart(); return false; }); } @Override public boolean beforeStop(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeStop(); return false; }); } @Override public boolean afterStop(PolicyController controller) { // NOTE: using doDeleteManager() instead of doManager() return doDeleteManager(controller, mgr -> { mgr.afterStop(); return false; }); } @Override public boolean beforeLock(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeLock(); return false; }); } @Override public boolean afterUnlock(PolicyController controller) { return doManager(controller, mgr -> { mgr.afterUnlock(); return false; }); } @Override public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) { /* * As this is invoked a lot, we'll directly call the manager's method instead of * using the functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { return false; } if (mgr.beforeOffer(protocol, topic2, event)) { return true; } offerArgs.set(new OfferArgs(protocol, topic2, event)); return false; } @Override public boolean beforeInsert(DroolsController droolsController, Object fact) { OfferArgs args = offerArgs.get(); if (args == null) { logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert"); return false; } PolicyController controller; try { controller = factory.getController(droolsController); } catch (IllegalArgumentException | IllegalStateException e) { logger.warn("cannot get controller for {} {}", droolsController.getGroupId(), droolsController.getArtifactId(), e); return false; } if (controller == null) { logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(), droolsController.getArtifactId()); return false; } /* * As this is invoked a lot, we'll directly call the manager's method instead of * using the functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { return false; } return mgr.beforeInsert(args.protocol, args.topic, args.event, fact); } @Override public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event, boolean success) { // clear any stored arguments offerArgs.set(null); return false; } /** * Executes a function using the manager associated with the controller. Catches any * exceptions from the function and re-throws it as a runtime exception. * * @param controller * @param func function to be executed * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ private boolean doManager(PolicyController controller, MgrFunc func) { PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { return false; } try { return func.apply(mgr); } catch (PoolingFeatureException e) { throw new PoolingFeatureRtException(e); } } /** * Executes a function using the manager associated with the controller and then * deletes the manager. Catches any exceptions from the function and re-throws it as a * runtime exception. * * @param controller * @param func function to be executed * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ private boolean doDeleteManager(PolicyController controller, Function func) { String name = controller.getName(); logger.info("remove feature-pool-dmaap manager for {}", name); // NOTE: using "remove()" instead of "get()" PoolingManagerImpl mgr = ctlr2pool.remove(name); if (mgr == null) { return false; } return func.apply(mgr); } /** * Function that operates on a manager. */ @FunctionalInterface private static interface MgrFunc { /** * * @param mgr * @return {@code true} if the request was handled by the manager, {@code false} * otherwise * @throws PoolingFeatureException */ public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException; } /** * Arguments captured from beforeOffer(). */ private static class OfferArgs { /** * Protocol of the receiving topic. */ private CommInfrastructure protocol; /** * Topic on which the event was received. */ private String topic; /** * The event text that was received on the topic. */ private String event; /** * * @param protocol * @param topic * @param event the actual event data received on the topic */ public OfferArgs(CommInfrastructure protocol, String topic, String event) { this.protocol = protocol; this.topic = topic; this.event = event; } } /** * Used to create objects. */ public static class Factory { /** * @param featName feature name * @return the properties for the specified feature */ public Properties getProperties(String featName) { return SystemPersistence.manager.getProperties(featName); } /** * Makes a pooling manager for a controller. * * @param host name/uuid of this host * @param controller * @param props properties to use to configure the manager * @param activeLatch decremented when the manager goes Active * @return a new pooling manager */ public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { return new PoolingManagerImpl(host, controller, props, activeLatch); } /** * Gets the policy controller associated with a drools controller. * * @param droolsController * @return the policy controller associated with a drools controller */ public PolicyController getController(DroolsController droolsController) { return PolicyController.factory.get(droolsController); } } }