2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import java.io.IOException;
24 import java.util.Properties;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.function.Function;
27 import org.onap.policy.common.utils.properties.exception.PropertyException;
28 import org.onap.policy.drools.controller.DroolsController;
29 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
30 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
32 import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
33 import org.onap.policy.drools.system.PolicyController;
34 import org.onap.policy.drools.utils.PropertyUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * Controller/session pooling. Multiple hosts may be launched, all servicing the
40 * same controllers/sessions. When this feature is enabled, the requests are
41 * divided across the different hosts, instead of all running on a single,
44 * With each controller, there is an associated DMaaP topic that is used for
45 * internal communication between the different hosts serving the controller.
47 public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControllerFeatureAPI, PolicySessionFeatureAPI {
49 private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
51 // TODO state-management doesn't allow more than one active host at a time
54 * Factory used to create objects.
56 private static Factory factory;
59 * Entire set of feature properties, including those specific to various
62 private Properties featProps = null;
65 * Maps a controller name to its associated manager.
67 private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
70 * Arguments passed to beforeOffer(), which are saved for when the
71 * beforeInsert() is called later. As multiple threads can be active within
72 * the methods at the same time, we must keep this in thread local storage.
74 private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
79 public PoolingFeature() {
83 protected static Factory getFactory() {
88 * Sets the factory to be used to create objects. Used by junit tests.
90 * @param factory the new factory to be used to create objects
92 protected static void setFactory(Factory factory) {
93 PoolingFeature.factory = factory;
97 public int getSequenceNumber() {
102 * @throws PoolingFeatureRtException if the properties cannot be read or are
106 public void globalInit(String[] args, String configDir) {
107 logger.info("initializing pooling feature");
110 featProps = PropertyUtil.getProperties(configDir + "/feature-pooling-dmaap.properties");
112 } catch (IOException ex) {
113 throw new PoolingFeatureRtException(ex);
118 * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
120 * @throws PoolingFeatureRtException if an error occurs
123 public boolean afterCreate(PolicyController controller) {
125 if (featProps == null) {
126 logger.error("pooling feature properties have not been loaded");
127 throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
130 String name = controller.getName();
132 if (FeatureEnabledChecker.isFeatureEnabled(featProps, name, PoolingProperties.FEATURE_ENABLED)) {
134 // get & validate the properties
135 PoolingProperties props = new PoolingProperties(name, featProps);
137 logger.info("pooling enabled for {}", name);
138 ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
140 } catch (PropertyException e) {
141 logger.error("pooling disabled due to exception for {}", name, e);
142 throw new PoolingFeatureRtException(e);
146 logger.info("pooling disabled for {}", name);
154 public boolean beforeStart(PolicyController controller) {
155 return doManager(controller, mgr -> {
162 public boolean afterStart(PolicyController controller) {
163 return doManager(controller, mgr -> {
170 public boolean beforeStop(PolicyController controller) {
171 return doManager(controller, mgr -> {
178 public boolean afterStop(PolicyController controller) {
180 // NOTE: using doDeleteManager() instead of doManager()
182 return doDeleteManager(controller, mgr -> {
190 public boolean beforeLock(PolicyController controller) {
191 return doManager(controller, mgr -> {
198 public boolean afterUnlock(PolicyController controller) {
199 return doManager(controller, mgr -> {
206 public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
208 * As this is invoked a lot, we'll directly call the manager's method
209 * instead of using the functional interface via doManager().
211 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
216 if (mgr.beforeOffer(protocol, topic2, event)) {
220 offerArgs.set(new OfferArgs(protocol, topic2, event));
225 public boolean beforeInsert(DroolsController droolsController, Object fact) {
227 OfferArgs args = offerArgs.get();
232 PolicyController controller;
234 controller = factory.getController(droolsController);
236 } catch (IllegalArgumentException | IllegalStateException e) {
240 if (controller == null) {
245 * As this is invoked a lot, we'll directly call the manager's method
246 * instead of using the functional interface via doManager().
248 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
253 return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
257 public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
260 // clear any stored arguments
267 * Executes a function using the manager associated with the controller.
268 * Catches any exceptions from the function and re-throws it as a runtime
272 * @param func function to be executed
273 * @return {@code true} if the function handled the request, {@code false}
275 * @throws PoolingFeatureRtException if an error occurs
277 private boolean doManager(PolicyController controller, MgrFunc func) {
278 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
284 return func.apply(mgr);
286 } catch (PoolingFeatureException e) {
287 throw e.toRuntimeException();
292 * Executes a function using the manager associated with the controller and
293 * then deletes the manager. Catches any exceptions from the function and
294 * re-throws it as a runtime exception.
297 * @param func function to be executed
298 * @return {@code true} if the function handled the request, {@code false}
300 * @throws PoolingFeatureRtException if an error occurs
302 private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
304 // NOTE: using "remove()" instead of "get()"
306 PoolingManagerImpl mgr = ctlr2pool.remove(controller.getName());
312 return func.apply(mgr);
316 * Function that operates on a manager.
319 private static interface MgrFunc {
324 * @return {@code true} if the request was handled by the manager,
325 * {@code false} otherwise
326 * @throws PoolingFeatureException
328 public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
332 * Arguments captured from beforeOffer().
334 private static class OfferArgs {
337 * Protocol of the receiving topic.
339 private CommInfrastructure protocol;
342 * Topic on which the event was received.
344 private String topic;
347 * The event text that was received on the topic.
349 private String event;
355 * @param event the actual event data received on the topic
357 public OfferArgs(CommInfrastructure protocol, String topic, String event) {
358 this.protocol = protocol;
365 * Used to create objects.
367 public static class Factory {
370 * Makes a pooling manager for a controller.
373 * @param props properties to use to configure the manager
374 * @return a new pooling manager
376 public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
377 return new PoolingManagerImpl(controller, props);
381 * Gets the policy controller associated with a drools controller.
383 * @param droolsController
384 * @return the policy controller associated with a drools controller
386 public PolicyController getController(DroolsController droolsController) {
387 return PolicyController.factory.get(droolsController);