Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingFeature.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.UUID;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CountDownLatch;
28 import lombok.AccessLevel;
29 import lombok.Getter;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
32 import org.onap.policy.common.endpoints.event.comm.TopicSink;
33 import org.onap.policy.common.endpoints.event.comm.TopicSource;
34 import org.onap.policy.common.utils.properties.SpecProperties;
35 import org.onap.policy.common.utils.properties.exception.PropertyException;
36 import org.onap.policy.drools.controller.DroolsController;
37 import org.onap.policy.drools.features.DroolsControllerFeatureApi;
38 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
39 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
40 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
41 import org.onap.policy.drools.system.PolicyController;
42 import org.onap.policy.drools.system.PolicyControllerConstants;
43 import org.onap.policy.drools.system.PolicyEngine;
44 import org.onap.policy.drools.util.FeatureEnabledChecker;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Controller/session pooling. Multiple hosts may be launched, all servicing the same
50  * controllers/sessions. When this feature is enabled, the requests are divided across the different
51  * hosts, instead of all running on a single, active host.
52  *
53  * <p>With each controller, there is an
54  * associated DMaaP topic that is used for internal communication between the different hosts
55  * serving the controller.
56  */
57 public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
58
59     private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
60
61     /**
62      * ID of this host.
63      */
64     @Getter
65     private final String host;
66
67     /**
68      * Entire set of feature properties, including those specific to various controllers.
69      */
70     private Properties featProps = null;
71
72     /**
73      * Maps a controller name to its associated manager.
74      */
75     private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
76
77     /**
78      * Decremented each time a manager enters the Active state. Used by junit tests.
79      */
80     @Getter(AccessLevel.PROTECTED)
81     private final CountDownLatch activeLatch = new CountDownLatch(1);
82
83     /**
84      * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
85      * called later. As multiple threads can be active within the methods at the same
86      * time, we must keep this in thread local storage.
87      */
88     private ThreadLocal<String> offerTopics = new ThreadLocal<>();
89
90     /**
91      * Constructor.
92      */
93     public PoolingFeature() {
94         super();
95
96         this.host = UUID.randomUUID().toString();
97     }
98
99     @Override
100     public int getSequenceNumber() {
101         return 0;
102     }
103
104     @Override
105     public boolean beforeStart(PolicyEngine engine) {
106         logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
107         featProps = getProperties(PoolingProperties.FEATURE_NAME);
108
109         // remove any generic pooling topic - always use controller-specific property
110         featProps.remove(PoolingProperties.POOLING_TOPIC);
111
112         initTopicSources(featProps);
113         initTopicSinks(featProps);
114
115         return false;
116     }
117
118     @Override
119     public boolean beforeStart(PolicyController controller) {
120         return doManager(controller, mgr -> {
121             mgr.beforeStart();
122             return false;
123         });
124     }
125
126     /**
127      * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
128      *
129      * @throws PoolingFeatureRtException if an error occurs
130      */
131     @Override
132     public boolean afterCreate(PolicyController controller) {
133
134         if (featProps == null) {
135             logger.error("pooling feature properties have not been loaded");
136             throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
137         }
138
139         String name = controller.getName();
140
141         var specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
142
143         if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
144             try {
145                 // get & validate the properties
146                 var props = new PoolingProperties(name, featProps);
147
148                 logger.info("pooling enabled for {}", name);
149                 ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch));
150
151             } catch (PropertyException e) {
152                 logger.error("pooling disabled due to exception for {}", name);
153                 throw new PoolingFeatureRtException(e);
154             }
155
156         } else {
157             logger.info("pooling disabled for {}", name);
158         }
159
160
161         return false;
162     }
163
164     @Override
165     public boolean afterStart(PolicyController controller) {
166         return doManager(controller, mgr -> {
167             mgr.afterStart();
168             return false;
169         });
170     }
171
172     @Override
173     public boolean beforeStop(PolicyController controller) {
174         return doManager(controller, mgr -> {
175             mgr.beforeStop();
176             return false;
177         });
178     }
179
180     @Override
181     public boolean afterStop(PolicyController controller) {
182         return doManager(controller, mgr -> {
183             mgr.afterStop();
184             return false;
185         });
186     }
187
188     @Override
189     public boolean afterShutdown(PolicyController controller) {
190         return commonShutdown(controller);
191     }
192
193     @Override
194     public boolean afterHalt(PolicyController controller) {
195         return commonShutdown(controller);
196     }
197
198     private boolean commonShutdown(PolicyController controller) {
199         deleteManager(controller);
200         return false;
201     }
202
203     @Override
204     public boolean beforeLock(PolicyController controller) {
205         return doManager(controller, mgr -> {
206             mgr.beforeLock();
207             return false;
208         });
209     }
210
211     @Override
212     public boolean afterUnlock(PolicyController controller) {
213         return doManager(controller, mgr -> {
214             mgr.afterUnlock();
215             return false;
216         });
217     }
218
219     @Override
220     public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
221         /*
222          * As this is invoked a lot, we'll directly call the manager's method instead of using the
223          * functional interface via doManager().
224          */
225         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
226         if (mgr == null) {
227             return false;
228         }
229
230         if (mgr.beforeOffer(topic2, event)) {
231             return true;
232         }
233
234         offerTopics.set(topic2);
235         return false;
236     }
237
238     @Override
239     public boolean beforeInsert(DroolsController droolsController, Object fact) {
240
241         String topic = offerTopics.get();
242         if (topic == null) {
243             logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
244             return false;
245         }
246
247         PolicyController controller;
248         try {
249             controller = getController(droolsController);
250
251         } catch (IllegalArgumentException | IllegalStateException e) {
252             logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
253                     droolsController.getArtifactId(), e);
254             return false;
255         }
256
257
258         if (controller == null) {
259             logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
260                     droolsController.getArtifactId());
261             return false;
262         }
263
264         /*
265          * As this is invoked a lot, we'll directly call the manager's method instead of using the
266          * functional interface via doManager().
267          */
268         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
269         if (mgr == null) {
270             return false;
271         }
272
273         return mgr.beforeInsert(topic, fact);
274     }
275
276     @Override
277     public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
278             boolean success) {
279
280         // clear any stored arguments
281         offerTopics.remove();
282
283         return false;
284     }
285
286     /**
287      * Executes a function using the manager associated with the controller. Catches any exceptions
288      * from the function and re-throws it as a runtime exception.
289      *
290      * @param controller controller
291      * @param func function to be executed
292      * @return {@code true} if the function handled the request, {@code false} otherwise
293      * @throws PoolingFeatureRtException if an error occurs
294      */
295     private boolean doManager(PolicyController controller, MgrFunc func) {
296         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
297         if (mgr == null) {
298             return false;
299         }
300
301         try {
302             return func.apply(mgr);
303
304         } catch (PoolingFeatureException e) {
305             throw new PoolingFeatureRtException(e);
306         }
307     }
308
309     /**
310      * Deletes the manager associated with a controller.
311      *
312      * @param controller controller
313      * @throws PoolingFeatureRtException if an error occurs
314      */
315     private void deleteManager(PolicyController controller) {
316
317         String name = controller.getName();
318         logger.info("remove feature-pool-dmaap manager for {}", name);
319
320         ctlr2pool.remove(name);
321     }
322
323     /**
324      * Function that operates on a manager.
325      */
326     @FunctionalInterface
327     private static interface MgrFunc {
328
329         /**
330          * Apply.
331          *
332          * @param mgr manager
333          * @return {@code true} if the request was handled by the manager, {@code false} otherwise
334          * @throws PoolingFeatureException feature exception
335          */
336         boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
337     }
338
339     /*
340      * The remaining methods may be overridden by junit tests.
341      */
342
343     /**
344      * Get properties.
345      *
346      * @param featName feature name
347      * @return the properties for the specified feature
348      */
349     protected Properties getProperties(String featName) {
350         return SystemPersistenceConstants.getManager().getProperties(featName);
351     }
352
353     /**
354      * Makes a pooling manager for a controller.
355      *
356      * @param host name/uuid of this host
357      * @param controller controller
358      * @param props properties to use to configure the manager
359      * @param activeLatch decremented when the manager goes Active
360      * @return a new pooling manager
361      */
362     protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
363             CountDownLatch activeLatch) {
364         return new PoolingManagerImpl(host, controller, props, activeLatch);
365     }
366
367     /**
368      * Gets the policy controller associated with a drools controller.
369      *
370      * @param droolsController drools controller
371      * @return the policy controller associated with a drools controller
372      */
373     protected PolicyController getController(DroolsController droolsController) {
374         return PolicyControllerConstants.getFactory().get(droolsController);
375     }
376
377     /**
378      * Initializes the topic sources.
379      *
380      * @param props properties used to configure the topics
381      * @return the topic sources
382      */
383     protected List<TopicSource> initTopicSources(Properties props) {
384         return TopicEndpointManager.getManager().addTopicSources(props);
385     }
386
387     /**
388      * Initializes the topic sinks.
389      *
390      * @param props properties used to configure the topics
391      * @return the topic sinks
392      */
393     protected List<TopicSink> initTopicSinks(Properties props) {
394         return TopicEndpointManager.getManager().addTopicSinks(props);
395     }
396 }