2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.UUID;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CountDownLatch;
28 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
29 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32 import org.onap.policy.common.utils.properties.SpecProperties;
33 import org.onap.policy.common.utils.properties.exception.PropertyException;
34 import org.onap.policy.drools.controller.DroolsController;
35 import org.onap.policy.drools.features.DroolsControllerFeatureApi;
36 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
37 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
38 import org.onap.policy.drools.persistence.SystemPersistence;
39 import org.onap.policy.drools.system.PolicyController;
40 import org.onap.policy.drools.system.PolicyEngine;
41 import org.onap.policy.drools.util.FeatureEnabledChecker;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Controller/session pooling. Multiple hosts may be launched, all servicing the same
47 * controllers/sessions. When this feature is enabled, the requests are divided across the different
48 * hosts, instead of all running on a single, active host.
50 * <p>With each controller, there is an
51 * associated DMaaP topic that is used for internal communication between the different hosts
52 * serving the controller.
54 public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
56 private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
61 private final String host;
64 * Entire set of feature properties, including those specific to various controllers.
66 private Properties featProps = null;
69 * Maps a controller name to its associated manager.
71 private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
74 * Decremented each time a manager enters the Active state. Used by junit tests.
76 private final CountDownLatch activeLatch = new CountDownLatch(1);
79 * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
80 * later. As multiple threads can be active within the methods at the same time, we must keep
81 * this in thread local storage.
83 private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
88 public PoolingFeature() {
91 this.host = UUID.randomUUID().toString();
94 public String getHost() {
101 * @return a latch that will be decremented when a manager enters the active state
103 protected CountDownLatch getActiveLatch() {
108 public int getSequenceNumber() {
113 public boolean beforeStart(PolicyEngine engine) {
114 logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
115 featProps = getProperties(PoolingProperties.FEATURE_NAME);
117 // remove any generic pooling topic - always use controller-specific property
118 featProps.remove(PoolingProperties.POOLING_TOPIC);
120 initTopicSources(featProps);
121 initTopicSinks(featProps);
127 public boolean beforeStart(PolicyController controller) {
128 return doManager(controller, mgr -> {
135 * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
137 * @throws PoolingFeatureRtException if an error occurs
140 public boolean afterCreate(PolicyController controller) {
142 if (featProps == null) {
143 logger.error("pooling feature properties have not been loaded");
144 throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
147 String name = controller.getName();
149 SpecProperties specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
151 if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
153 // get & validate the properties
154 PoolingProperties props = new PoolingProperties(name, featProps);
156 logger.info("pooling enabled for {}", name);
157 ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch));
159 } catch (PropertyException e) {
160 logger.error("pooling disabled due to exception for {}", name, e);
161 throw new PoolingFeatureRtException(e);
165 logger.info("pooling disabled for {}", name);
173 public boolean afterStart(PolicyController controller) {
174 return doManager(controller, mgr -> {
181 public boolean beforeStop(PolicyController controller) {
182 return doManager(controller, mgr -> {
189 public boolean afterStop(PolicyController controller) {
190 return doManager(controller, mgr -> {
197 public boolean afterShutdown(PolicyController controller) {
198 deleteManager(controller);
203 public boolean afterHalt(PolicyController controller) {
204 deleteManager(controller);
209 public boolean beforeLock(PolicyController controller) {
210 return doManager(controller, mgr -> {
217 public boolean afterUnlock(PolicyController controller) {
218 return doManager(controller, mgr -> {
225 public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
227 * As this is invoked a lot, we'll directly call the manager's method instead of using the
228 * functional interface via doManager().
230 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
235 if (mgr.beforeOffer(protocol, topic2, event)) {
239 offerArgs.set(new OfferArgs(protocol, topic2, event));
244 public boolean beforeInsert(DroolsController droolsController, Object fact) {
246 OfferArgs args = offerArgs.get();
248 logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
252 PolicyController controller;
254 controller = getController(droolsController);
256 } catch (IllegalArgumentException | IllegalStateException e) {
257 logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
258 droolsController.getArtifactId(), e);
263 if (controller == null) {
264 logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
265 droolsController.getArtifactId());
270 * As this is invoked a lot, we'll directly call the manager's method instead of using the
271 * functional interface via doManager().
273 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
278 return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
282 public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
285 // clear any stored arguments
292 * Executes a function using the manager associated with the controller. Catches any exceptions
293 * from the function and re-throws it as a runtime exception.
295 * @param controller controller
296 * @param func function to be executed
297 * @return {@code true} if the function handled the request, {@code false} otherwise
298 * @throws PoolingFeatureRtException if an error occurs
300 private boolean doManager(PolicyController controller, MgrFunc func) {
301 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
307 return func.apply(mgr);
309 } catch (PoolingFeatureException e) {
310 throw new PoolingFeatureRtException(e);
315 * Deletes the manager associated with a controller.
317 * @param controller controller
318 * @throws PoolingFeatureRtException if an error occurs
320 private void deleteManager(PolicyController controller) {
322 String name = controller.getName();
323 logger.info("remove feature-pool-dmaap manager for {}", name);
325 ctlr2pool.remove(name);
329 * Function that operates on a manager.
332 private static interface MgrFunc {
338 * @return {@code true} if the request was handled by the manager, {@code false} otherwise
339 * @throws PoolingFeatureException feature exception
341 public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
345 * Arguments captured from beforeOffer().
347 private static class OfferArgs {
350 * Protocol of the receiving topic.
352 private CommInfrastructure protocol;
355 * Topic on which the event was received.
357 private String topic;
360 * The event text that was received on the topic.
362 private String event;
367 * @param protocol protocol
369 * @param event the actual event data received on the topic
371 public OfferArgs(CommInfrastructure protocol, String topic, String event) {
372 this.protocol = protocol;
379 * The remaining methods may be overridden by junit tests.
385 * @param featName feature name
386 * @return the properties for the specified feature
388 protected Properties getProperties(String featName) {
389 return SystemPersistence.manager.getProperties(featName);
393 * Makes a pooling manager for a controller.
395 * @param host name/uuid of this host
396 * @param controller controller
397 * @param props properties to use to configure the manager
398 * @param activeLatch decremented when the manager goes Active
399 * @return a new pooling manager
401 protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
402 CountDownLatch activeLatch) {
403 return new PoolingManagerImpl(host, controller, props, activeLatch);
407 * Gets the policy controller associated with a drools controller.
409 * @param droolsController drools controller
410 * @return the policy controller associated with a drools controller
412 protected PolicyController getController(DroolsController droolsController) {
413 return PolicyController.factory.get(droolsController);
417 * Initializes the topic sources.
419 * @param props properties used to configure the topics
420 * @return the topic sources
422 protected List<TopicSource> initTopicSources(Properties props) {
423 return TopicEndpointManager.getManager().addTopicSources(props);
427 * Initializes the topic sinks.
429 * @param props properties used to configure the topics
430 * @return the topic sinks
432 protected List<TopicSink> initTopicSinks(Properties props) {
433 return TopicEndpointManager.getManager().addTopicSinks(props);