2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.UUID;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.CountDownLatch;
29 import lombok.AccessLevel;
31 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
32 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
33 import org.onap.policy.common.endpoints.event.comm.TopicSink;
34 import org.onap.policy.common.endpoints.event.comm.TopicSource;
35 import org.onap.policy.common.utils.properties.SpecProperties;
36 import org.onap.policy.common.utils.properties.exception.PropertyException;
37 import org.onap.policy.drools.controller.DroolsController;
38 import org.onap.policy.drools.features.DroolsControllerFeatureApi;
39 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
40 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
41 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
42 import org.onap.policy.drools.system.PolicyController;
43 import org.onap.policy.drools.system.PolicyControllerConstants;
44 import org.onap.policy.drools.system.PolicyEngine;
45 import org.onap.policy.drools.util.FeatureEnabledChecker;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Controller/session pooling. Multiple hosts may be launched, all servicing the same
51 * controllers/sessions. When this feature is enabled, the requests are divided across the different
52 * hosts, instead of all running on a single, active host.
54 * <p>With each controller, there is an
55 * associated DMaaP topic that is used for internal communication between the different hosts
56 * serving the controller.
58 public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
60 private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
66 private final String host;
69 * Entire set of feature properties, including those specific to various controllers.
71 private Properties featProps = null;
74 * Maps a controller name to its associated manager.
76 private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
79 * Decremented each time a manager enters the Active state. Used by junit tests.
81 @Getter(AccessLevel.PROTECTED)
82 private final CountDownLatch activeLatch = new CountDownLatch(1);
85 * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
86 * called later. As multiple threads can be active within the methods at the same
87 * time, we must keep this in thread local storage.
89 private ThreadLocal<String> offerTopics = new ThreadLocal<>();
94 public PoolingFeature() {
97 this.host = UUID.randomUUID().toString();
101 public int getSequenceNumber() {
106 public boolean beforeStart(PolicyEngine engine) {
107 logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
108 featProps = getProperties(PoolingProperties.FEATURE_NAME);
110 // remove any generic pooling topic - always use controller-specific property
111 featProps.remove(PoolingProperties.POOLING_TOPIC);
113 initTopicSources(featProps);
114 initTopicSinks(featProps);
120 public boolean beforeStart(PolicyController controller) {
121 return doManager(controller, mgr -> {
128 * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
130 * @throws PoolingFeatureRtException if an error occurs
133 public boolean afterCreate(PolicyController controller) {
135 if (featProps == null) {
136 logger.error("pooling feature properties have not been loaded");
137 throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
140 String name = controller.getName();
142 var specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
144 if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
146 // get & validate the properties
147 var props = new PoolingProperties(name, featProps);
149 logger.info("pooling enabled for {}", name);
150 ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch));
152 } catch (PropertyException e) {
153 logger.error("pooling disabled due to exception for {}", name);
154 throw new PoolingFeatureRtException(e);
158 logger.info("pooling disabled for {}", name);
166 public boolean afterStart(PolicyController controller) {
167 return doManager(controller, mgr -> {
174 public boolean beforeStop(PolicyController controller) {
175 return doManager(controller, mgr -> {
182 public boolean afterStop(PolicyController controller) {
183 return doManager(controller, mgr -> {
190 public boolean afterShutdown(PolicyController controller) {
191 return commonShutdown(controller);
195 public boolean afterHalt(PolicyController controller) {
196 return commonShutdown(controller);
199 private boolean commonShutdown(PolicyController controller) {
200 deleteManager(controller);
205 public boolean beforeLock(PolicyController controller) {
206 return doManager(controller, mgr -> {
213 public boolean afterUnlock(PolicyController controller) {
214 return doManager(controller, mgr -> {
221 public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
223 * As this is invoked a lot, we'll directly call the manager's method instead of using the
224 * functional interface via doManager().
226 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
231 if (mgr.beforeOffer(topic2, event)) {
235 offerTopics.set(topic2);
240 public boolean beforeInsert(DroolsController droolsController, Object fact) {
242 String topic = offerTopics.get();
244 logger.warn("missing arguments for feature-pooling-messages in beforeInsert");
248 PolicyController controller;
250 controller = getController(droolsController);
252 } catch (IllegalArgumentException | IllegalStateException e) {
253 logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
254 droolsController.getArtifactId(), e);
259 if (controller == null) {
260 logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
261 droolsController.getArtifactId());
266 * As this is invoked a lot, we'll directly call the manager's method instead of using the
267 * functional interface via doManager().
269 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
274 return mgr.beforeInsert(topic, fact);
278 public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
281 // clear any stored arguments
282 offerTopics.remove();
288 * Executes a function using the manager associated with the controller. Catches any exceptions
289 * from the function and re-throws it as a runtime exception.
291 * @param controller controller
292 * @param func function to be executed
293 * @return {@code true} if the function handled the request, {@code false} otherwise
294 * @throws PoolingFeatureRtException if an error occurs
296 private boolean doManager(PolicyController controller, MgrFunc func) {
297 PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
303 return func.apply(mgr);
305 } catch (PoolingFeatureException e) {
306 throw new PoolingFeatureRtException(e);
311 * Deletes the manager associated with a controller.
313 * @param controller controller
314 * @throws PoolingFeatureRtException if an error occurs
316 private void deleteManager(PolicyController controller) {
318 String name = controller.getName();
319 logger.info("remove feature-pooling-messages manager for {}", name);
321 ctlr2pool.remove(name);
325 * Function that operates on a manager.
328 private static interface MgrFunc {
334 * @return {@code true} if the request was handled by the manager, {@code false} otherwise
335 * @throws PoolingFeatureException feature exception
337 boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
341 * The remaining methods may be overridden by junit tests.
347 * @param featName feature name
348 * @return the properties for the specified feature
350 protected Properties getProperties(String featName) {
351 return SystemPersistenceConstants.getManager().getProperties(featName);
355 * Makes a pooling manager for a controller.
357 * @param host name/uuid of this host
358 * @param controller controller
359 * @param props properties to use to configure the manager
360 * @param activeLatch decremented when the manager goes Active
361 * @return a new pooling manager
363 protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
364 CountDownLatch activeLatch) {
365 return new PoolingManagerImpl(host, controller, props, activeLatch);
369 * Gets the policy controller associated with a drools controller.
371 * @param droolsController drools controller
372 * @return the policy controller associated with a drools controller
374 protected PolicyController getController(DroolsController droolsController) {
375 return PolicyControllerConstants.getFactory().get(droolsController);
379 * Initializes the topic sources.
381 * @param props properties used to configure the topics
382 * @return the topic sources
384 protected List<TopicSource> initTopicSources(Properties props) {
385 return TopicEndpointManager.getManager().addTopicSources(props);
389 * Initializes the topic sinks.
391 * @param props properties used to configure the topics
392 * @return the topic sinks
394 protected List<TopicSink> initTopicSinks(Properties props) {
395 return TopicEndpointManager.getManager().addTopicSinks(props);