Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / PoolingFeature.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.UUID;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.CountDownLatch;
29 import lombok.AccessLevel;
30 import lombok.Getter;
31 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
32 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
33 import org.onap.policy.common.endpoints.event.comm.TopicSink;
34 import org.onap.policy.common.endpoints.event.comm.TopicSource;
35 import org.onap.policy.common.utils.properties.SpecProperties;
36 import org.onap.policy.common.utils.properties.exception.PropertyException;
37 import org.onap.policy.drools.controller.DroolsController;
38 import org.onap.policy.drools.features.DroolsControllerFeatureApi;
39 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
40 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
41 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
42 import org.onap.policy.drools.system.PolicyController;
43 import org.onap.policy.drools.system.PolicyControllerConstants;
44 import org.onap.policy.drools.system.PolicyEngine;
45 import org.onap.policy.drools.util.FeatureEnabledChecker;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Controller/session pooling. Multiple hosts may be launched, all servicing the same
51  * controllers/sessions. When this feature is enabled, the requests are divided across the different
52  * hosts, instead of all running on a single, active host.
53  *
54  * <p>With each controller, there is an
55  * associated DMaaP topic that is used for internal communication between the different hosts
56  * serving the controller.
57  */
58 public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
59
60     private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
61
62     /**
63      * ID of this host.
64      */
65     @Getter
66     private final String host;
67
68     /**
69      * Entire set of feature properties, including those specific to various controllers.
70      */
71     private Properties featProps = null;
72
73     /**
74      * Maps a controller name to its associated manager.
75      */
76     private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
77
78     /**
79      * Decremented each time a manager enters the Active state. Used by junit tests.
80      */
81     @Getter(AccessLevel.PROTECTED)
82     private final CountDownLatch activeLatch = new CountDownLatch(1);
83
84     /**
85      * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
86      * called later. As multiple threads can be active within the methods at the same
87      * time, we must keep this in thread local storage.
88      */
89     private ThreadLocal<String> offerTopics = new ThreadLocal<>();
90
91     /**
92      * Constructor.
93      */
94     public PoolingFeature() {
95         super();
96
97         this.host = UUID.randomUUID().toString();
98     }
99
100     @Override
101     public int getSequenceNumber() {
102         return 0;
103     }
104
105     @Override
106     public boolean beforeStart(PolicyEngine engine) {
107         logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
108         featProps = getProperties(PoolingProperties.FEATURE_NAME);
109
110         // remove any generic pooling topic - always use controller-specific property
111         featProps.remove(PoolingProperties.POOLING_TOPIC);
112
113         initTopicSources(featProps);
114         initTopicSinks(featProps);
115
116         return false;
117     }
118
119     @Override
120     public boolean beforeStart(PolicyController controller) {
121         return doManager(controller, mgr -> {
122             mgr.beforeStart();
123             return false;
124         });
125     }
126
127     /**
128      * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
129      *
130      * @throws PoolingFeatureRtException if an error occurs
131      */
132     @Override
133     public boolean afterCreate(PolicyController controller) {
134
135         if (featProps == null) {
136             logger.error("pooling feature properties have not been loaded");
137             throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
138         }
139
140         String name = controller.getName();
141
142         var specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
143
144         if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
145             try {
146                 // get & validate the properties
147                 var props = new PoolingProperties(name, featProps);
148
149                 logger.info("pooling enabled for {}", name);
150                 ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch));
151
152             } catch (PropertyException e) {
153                 logger.error("pooling disabled due to exception for {}", name);
154                 throw new PoolingFeatureRtException(e);
155             }
156
157         } else {
158             logger.info("pooling disabled for {}", name);
159         }
160
161
162         return false;
163     }
164
165     @Override
166     public boolean afterStart(PolicyController controller) {
167         return doManager(controller, mgr -> {
168             mgr.afterStart();
169             return false;
170         });
171     }
172
173     @Override
174     public boolean beforeStop(PolicyController controller) {
175         return doManager(controller, mgr -> {
176             mgr.beforeStop();
177             return false;
178         });
179     }
180
181     @Override
182     public boolean afterStop(PolicyController controller) {
183         return doManager(controller, mgr -> {
184             mgr.afterStop();
185             return false;
186         });
187     }
188
189     @Override
190     public boolean afterShutdown(PolicyController controller) {
191         return commonShutdown(controller);
192     }
193
194     @Override
195     public boolean afterHalt(PolicyController controller) {
196         return commonShutdown(controller);
197     }
198
199     private boolean commonShutdown(PolicyController controller) {
200         deleteManager(controller);
201         return false;
202     }
203
204     @Override
205     public boolean beforeLock(PolicyController controller) {
206         return doManager(controller, mgr -> {
207             mgr.beforeLock();
208             return false;
209         });
210     }
211
212     @Override
213     public boolean afterUnlock(PolicyController controller) {
214         return doManager(controller, mgr -> {
215             mgr.afterUnlock();
216             return false;
217         });
218     }
219
220     @Override
221     public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
222         /*
223          * As this is invoked a lot, we'll directly call the manager's method instead of using the
224          * functional interface via doManager().
225          */
226         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
227         if (mgr == null) {
228             return false;
229         }
230
231         if (mgr.beforeOffer(topic2, event)) {
232             return true;
233         }
234
235         offerTopics.set(topic2);
236         return false;
237     }
238
239     @Override
240     public boolean beforeInsert(DroolsController droolsController, Object fact) {
241
242         String topic = offerTopics.get();
243         if (topic == null) {
244             logger.warn("missing arguments for feature-pooling-messages in beforeInsert");
245             return false;
246         }
247
248         PolicyController controller;
249         try {
250             controller = getController(droolsController);
251
252         } catch (IllegalArgumentException | IllegalStateException e) {
253             logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
254                     droolsController.getArtifactId(), e);
255             return false;
256         }
257
258
259         if (controller == null) {
260             logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
261                     droolsController.getArtifactId());
262             return false;
263         }
264
265         /*
266          * As this is invoked a lot, we'll directly call the manager's method instead of using the
267          * functional interface via doManager().
268          */
269         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
270         if (mgr == null) {
271             return false;
272         }
273
274         return mgr.beforeInsert(topic, fact);
275     }
276
277     @Override
278     public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
279             boolean success) {
280
281         // clear any stored arguments
282         offerTopics.remove();
283
284         return false;
285     }
286
287     /**
288      * Executes a function using the manager associated with the controller. Catches any exceptions
289      * from the function and re-throws it as a runtime exception.
290      *
291      * @param controller controller
292      * @param func function to be executed
293      * @return {@code true} if the function handled the request, {@code false} otherwise
294      * @throws PoolingFeatureRtException if an error occurs
295      */
296     private boolean doManager(PolicyController controller, MgrFunc func) {
297         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
298         if (mgr == null) {
299             return false;
300         }
301
302         try {
303             return func.apply(mgr);
304
305         } catch (PoolingFeatureException e) {
306             throw new PoolingFeatureRtException(e);
307         }
308     }
309
310     /**
311      * Deletes the manager associated with a controller.
312      *
313      * @param controller controller
314      * @throws PoolingFeatureRtException if an error occurs
315      */
316     private void deleteManager(PolicyController controller) {
317
318         String name = controller.getName();
319         logger.info("remove feature-pooling-messages manager for {}", name);
320
321         ctlr2pool.remove(name);
322     }
323
324     /**
325      * Function that operates on a manager.
326      */
327     @FunctionalInterface
328     private static interface MgrFunc {
329
330         /**
331          * Apply.
332          *
333          * @param mgr manager
334          * @return {@code true} if the request was handled by the manager, {@code false} otherwise
335          * @throws PoolingFeatureException feature exception
336          */
337         boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
338     }
339
340     /*
341      * The remaining methods may be overridden by junit tests.
342      */
343
344     /**
345      * Get properties.
346      *
347      * @param featName feature name
348      * @return the properties for the specified feature
349      */
350     protected Properties getProperties(String featName) {
351         return SystemPersistenceConstants.getManager().getProperties(featName);
352     }
353
354     /**
355      * Makes a pooling manager for a controller.
356      *
357      * @param host name/uuid of this host
358      * @param controller controller
359      * @param props properties to use to configure the manager
360      * @param activeLatch decremented when the manager goes Active
361      * @return a new pooling manager
362      */
363     protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
364             CountDownLatch activeLatch) {
365         return new PoolingManagerImpl(host, controller, props, activeLatch);
366     }
367
368     /**
369      * Gets the policy controller associated with a drools controller.
370      *
371      * @param droolsController drools controller
372      * @return the policy controller associated with a drools controller
373      */
374     protected PolicyController getController(DroolsController droolsController) {
375         return PolicyControllerConstants.getFactory().get(droolsController);
376     }
377
378     /**
379      * Initializes the topic sources.
380      *
381      * @param props properties used to configure the topics
382      * @return the topic sources
383      */
384     protected List<TopicSource> initTopicSources(Properties props) {
385         return TopicEndpointManager.getManager().addTopicSources(props);
386     }
387
388     /**
389      * Initializes the topic sinks.
390      *
391      * @param props properties used to configure the topics
392      * @return the topic sinks
393      */
394     protected List<TopicSink> initTopicSinks(Properties props) {
395         return TopicEndpointManager.getManager().addTopicSinks(props);
396     }
397 }