2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import java.util.Properties;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.function.Function;
26 import org.onap.policy.common.utils.properties.exception.PropertyException;
27 import org.onap.policy.drools.controller.DroolsController;
28 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
29 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
30 import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
31 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
32 import org.onap.policy.drools.persistence.SystemPersistence;
33 import org.onap.policy.drools.system.PolicyController;
34 import org.onap.policy.drools.system.PolicyEngine;
35 import org.onap.policy.drools.util.FeatureEnabledChecker;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Controller/session pooling. Multiple hosts may be launched, all servicing the same
41 * controllers/sessions. When this feature is enabled, the requests are divided across the
42 * different hosts, instead of all running on a single, active host.
44 * With each controller, there is an associated DMaaP topic that is used for internal
45 * communication between the different hosts serving the controller.
47 public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI {
49 private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
52 * Factory used to create objects.
54 private static Factory factory;
57 * Entire set of feature properties, including those specific to various controllers.
59 private Properties featProps = null;
62 * Maps a controller name to its associated manager.
64 private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
67 * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
68 * called later. As multiple threads can be active within the methods at the same
69 * time, we must keep this in thread local storage.
71 private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
76 public PoolingFeature() {
80 protected static Factory getFactory() {
85 * Sets the factory to be used to create objects. Used by junit tests.
87 * @param factory the new factory to be used to create objects
89 protected static void setFactory(Factory factory) {
90 PoolingFeature.factory = factory;
94 public int getSequenceNumber() {
99 public boolean beforeStart(PolicyEngine engine) {
100 logger.info("initializing " + PoolingProperties.FEATURE_NAME);
101 featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
106 * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
108 * @throws PoolingFeatureRtException if an error occurs
111 public boolean afterCreate(PolicyController controller) {
113 if (featProps == null) {
114 logger.error("pooling feature properties have not been loaded");
115 throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
118 String name = controller.getName();
120 if (FeatureEnabledChecker.isFeatureEnabled(featProps, name, PoolingProperties.FEATURE_ENABLED)) {
122 // get & validate the properties
123 PoolingProperties props = new PoolingProperties(name, featProps);
125 logger.info("pooling enabled for {}", name);
126 ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
128 } catch (PropertyException e) {
129 logger.error("pooling disabled due to exception for {}", name, e);
130 throw new PoolingFeatureRtException(e);
134 logger.info("pooling disabled for {}", name);
142 public boolean beforeStart(PolicyController controller) {
143 return doManager(controller, mgr -> {
150 public boolean afterStart(PolicyController controller) {
151 return doManager(controller, mgr -> {
158 public boolean beforeStop(PolicyController controller) {
159 return doManager(controller, mgr -> {
166 public boolean afterStop(PolicyController controller) {
168 // NOTE: using doDeleteManager() instead of doManager()
170 return doDeleteManager(controller, mgr -> {
178 public boolean beforeLock(PolicyController controller) {
179 return doManager(controller, mgr -> {
186 public boolean afterUnlock(PolicyController controller) {
187 return doManager(controller, mgr -> {
194 public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
196 * As this is invoked a lot, we'll directly call the manager's method instead of
197 * using the functional interface via doManager().
199 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
204 if (mgr.beforeOffer(protocol, topic2, event)) {
208 offerArgs.set(new OfferArgs(protocol, topic2, event));
213 public boolean beforeInsert(DroolsController droolsController, Object fact) {
215 OfferArgs args = offerArgs.get();
217 logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
221 PolicyController controller;
223 controller = factory.getController(droolsController);
225 } catch (IllegalArgumentException | IllegalStateException e) {
226 logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
227 droolsController.getArtifactId(), e);
232 if (controller == null) {
233 logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
234 droolsController.getArtifactId());
239 * As this is invoked a lot, we'll directly call the manager's method instead of
240 * using the functional interface via doManager().
242 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
247 return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
251 public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
254 // clear any stored arguments
261 * Executes a function using the manager associated with the controller. Catches any
262 * exceptions from the function and re-throws it as a runtime exception.
265 * @param func function to be executed
266 * @return {@code true} if the function handled the request, {@code false} otherwise
267 * @throws PoolingFeatureRtException if an error occurs
269 private boolean doManager(PolicyController controller, MgrFunc func) {
270 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
276 return func.apply(mgr);
278 } catch (PoolingFeatureException e) {
279 throw new PoolingFeatureRtException(e);
284 * Executes a function using the manager associated with the controller and then
285 * deletes the manager. Catches any exceptions from the function and re-throws it as a
289 * @param func function to be executed
290 * @return {@code true} if the function handled the request, {@code false} otherwise
291 * @throws PoolingFeatureRtException if an error occurs
293 private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
295 String name = controller.getName();
296 logger.info("remove feature-pool-dmaap manager for {}", name);
298 // NOTE: using "remove()" instead of "get()"
300 PoolingManagerImpl mgr = ctlr2pool.remove(name);
306 return func.apply(mgr);
310 * Function that operates on a manager.
313 private static interface MgrFunc {
318 * @return {@code true} if the request was handled by the manager, {@code false}
320 * @throws PoolingFeatureException
322 public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
326 * Arguments captured from beforeOffer().
328 private static class OfferArgs {
331 * Protocol of the receiving topic.
333 private CommInfrastructure protocol;
336 * Topic on which the event was received.
338 private String topic;
341 * The event text that was received on the topic.
343 private String event;
349 * @param event the actual event data received on the topic
351 public OfferArgs(CommInfrastructure protocol, String topic, String event) {
352 this.protocol = protocol;
359 * Used to create objects.
361 public static class Factory {
364 * @param featName feature name
365 * @return the properties for the specified feature
367 public Properties getProperties(String featName) {
368 return SystemPersistence.manager.getProperties(featName);
372 * Makes a pooling manager for a controller.
375 * @param props properties to use to configure the manager
376 * @return a new pooling manager
378 public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
379 return new PoolingManagerImpl(controller, props);
383 * Gets the policy controller associated with a drools controller.
385 * @param droolsController
386 * @return the policy controller associated with a drools controller
388 public PolicyController getController(DroolsController droolsController) {
389 return PolicyController.factory.get(droolsController);