Make feature-pooling-dmaap work without filtering
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingFeature.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.UUID;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CountDownLatch;
28 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
29 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32 import org.onap.policy.common.utils.properties.SpecProperties;
33 import org.onap.policy.common.utils.properties.exception.PropertyException;
34 import org.onap.policy.drools.controller.DroolsController;
35 import org.onap.policy.drools.features.DroolsControllerFeatureApi;
36 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
37 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
38 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
39 import org.onap.policy.drools.system.PolicyController;
40 import org.onap.policy.drools.system.PolicyControllerConstants;
41 import org.onap.policy.drools.system.PolicyEngine;
42 import org.onap.policy.drools.util.FeatureEnabledChecker;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Controller/session pooling. Multiple hosts may be launched, all servicing the same
48  * controllers/sessions. When this feature is enabled, the requests are divided across the different
49  * hosts, instead of all running on a single, active host.
50  *
51  * <p>With each controller, there is an
52  * associated DMaaP topic that is used for internal communication between the different hosts
53  * serving the controller.
54  */
55 public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
56
57     private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
58
59     /**
60      * ID of this host.
61      */
62     private final String host;
63
64     /**
65      * Entire set of feature properties, including those specific to various controllers.
66      */
67     private Properties featProps = null;
68
69     /**
70      * Maps a controller name to its associated manager.
71      */
72     private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
73
74     /**
75      * Decremented each time a manager enters the Active state. Used by junit tests.
76      */
77     private final CountDownLatch activeLatch = new CountDownLatch(1);
78
79     /**
80      * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
81      * called later. As multiple threads can be active within the methods at the same
82      * time, we must keep this in thread local storage.
83      */
84     private ThreadLocal<String> offerTopics = new ThreadLocal<>();
85
86     /**
87      * Constructor.
88      */
89     public PoolingFeature() {
90         super();
91
92         this.host = UUID.randomUUID().toString();
93     }
94
95     public String getHost() {
96         return host;
97     }
98
99     /**
100      * Get active latch.
101      *
102      * @return a latch that will be decremented when a manager enters the active state
103      */
104     protected CountDownLatch getActiveLatch() {
105         return activeLatch;
106     }
107
108     @Override
109     public int getSequenceNumber() {
110         return 0;
111     }
112
113     @Override
114     public boolean beforeStart(PolicyEngine engine) {
115         logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
116         featProps = getProperties(PoolingProperties.FEATURE_NAME);
117
118         // remove any generic pooling topic - always use controller-specific property
119         featProps.remove(PoolingProperties.POOLING_TOPIC);
120
121         initTopicSources(featProps);
122         initTopicSinks(featProps);
123
124         return false;
125     }
126
127     @Override
128     public boolean beforeStart(PolicyController controller) {
129         return doManager(controller, mgr -> {
130             mgr.beforeStart();
131             return false;
132         });
133     }
134
135     /**
136      * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
137      *
138      * @throws PoolingFeatureRtException if an error occurs
139      */
140     @Override
141     public boolean afterCreate(PolicyController controller) {
142
143         if (featProps == null) {
144             logger.error("pooling feature properties have not been loaded");
145             throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
146         }
147
148         String name = controller.getName();
149
150         SpecProperties specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
151
152         if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
153             try {
154                 // get & validate the properties
155                 PoolingProperties props = new PoolingProperties(name, featProps);
156
157                 logger.info("pooling enabled for {}", name);
158                 ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch));
159
160             } catch (PropertyException e) {
161                 logger.error("pooling disabled due to exception for {}", name);
162                 throw new PoolingFeatureRtException(e);
163             }
164
165         } else {
166             logger.info("pooling disabled for {}", name);
167         }
168
169
170         return false;
171     }
172
173     @Override
174     public boolean afterStart(PolicyController controller) {
175         return doManager(controller, mgr -> {
176             mgr.afterStart();
177             return false;
178         });
179     }
180
181     @Override
182     public boolean beforeStop(PolicyController controller) {
183         return doManager(controller, mgr -> {
184             mgr.beforeStop();
185             return false;
186         });
187     }
188
189     @Override
190     public boolean afterStop(PolicyController controller) {
191         return doManager(controller, mgr -> {
192             mgr.afterStop();
193             return false;
194         });
195     }
196
197     @Override
198     public boolean afterShutdown(PolicyController controller) {
199         return commonShutdown(controller);
200     }
201
202     @Override
203     public boolean afterHalt(PolicyController controller) {
204         return commonShutdown(controller);
205     }
206
207     private boolean commonShutdown(PolicyController controller) {
208         deleteManager(controller);
209         return false;
210     }
211
212     @Override
213     public boolean beforeLock(PolicyController controller) {
214         return doManager(controller, mgr -> {
215             mgr.beforeLock();
216             return false;
217         });
218     }
219
220     @Override
221     public boolean afterUnlock(PolicyController controller) {
222         return doManager(controller, mgr -> {
223             mgr.afterUnlock();
224             return false;
225         });
226     }
227
228     @Override
229     public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
230         /*
231          * As this is invoked a lot, we'll directly call the manager's method instead of using the
232          * functional interface via doManager().
233          */
234         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
235         if (mgr == null) {
236             return false;
237         }
238
239         if (mgr.beforeOffer(topic2, event)) {
240             return true;
241         }
242
243         offerTopics.set(topic2);
244         return false;
245     }
246
247     @Override
248     public boolean beforeInsert(DroolsController droolsController, Object fact) {
249
250         String topic = offerTopics.get();
251         if (topic == null) {
252             logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
253             return false;
254         }
255
256         PolicyController controller;
257         try {
258             controller = getController(droolsController);
259
260         } catch (IllegalArgumentException | IllegalStateException e) {
261             logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
262                     droolsController.getArtifactId(), e);
263             return false;
264         }
265
266
267         if (controller == null) {
268             logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
269                     droolsController.getArtifactId());
270             return false;
271         }
272
273         /*
274          * As this is invoked a lot, we'll directly call the manager's method instead of using the
275          * functional interface via doManager().
276          */
277         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
278         if (mgr == null) {
279             return false;
280         }
281
282         return mgr.beforeInsert(topic, fact);
283     }
284
285     @Override
286     public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
287             boolean success) {
288
289         // clear any stored arguments
290         offerTopics.remove();
291
292         return false;
293     }
294
295     /**
296      * Executes a function using the manager associated with the controller. Catches any exceptions
297      * from the function and re-throws it as a runtime exception.
298      *
299      * @param controller controller
300      * @param func function to be executed
301      * @return {@code true} if the function handled the request, {@code false} otherwise
302      * @throws PoolingFeatureRtException if an error occurs
303      */
304     private boolean doManager(PolicyController controller, MgrFunc func) {
305         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
306         if (mgr == null) {
307             return false;
308         }
309
310         try {
311             return func.apply(mgr);
312
313         } catch (PoolingFeatureException e) {
314             throw new PoolingFeatureRtException(e);
315         }
316     }
317
318     /**
319      * Deletes the manager associated with a controller.
320      *
321      * @param controller controller
322      * @throws PoolingFeatureRtException if an error occurs
323      */
324     private void deleteManager(PolicyController controller) {
325
326         String name = controller.getName();
327         logger.info("remove feature-pool-dmaap manager for {}", name);
328
329         ctlr2pool.remove(name);
330     }
331
332     /**
333      * Function that operates on a manager.
334      */
335     @FunctionalInterface
336     private static interface MgrFunc {
337
338         /**
339          * Apply.
340          *
341          * @param mgr manager
342          * @return {@code true} if the request was handled by the manager, {@code false} otherwise
343          * @throws PoolingFeatureException feature exception
344          */
345         boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
346     }
347
348     /*
349      * The remaining methods may be overridden by junit tests.
350      */
351
352     /**
353      * Get properties.
354      *
355      * @param featName feature name
356      * @return the properties for the specified feature
357      */
358     protected Properties getProperties(String featName) {
359         return SystemPersistenceConstants.getManager().getProperties(featName);
360     }
361
362     /**
363      * Makes a pooling manager for a controller.
364      *
365      * @param host name/uuid of this host
366      * @param controller controller
367      * @param props properties to use to configure the manager
368      * @param activeLatch decremented when the manager goes Active
369      * @return a new pooling manager
370      */
371     protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
372             CountDownLatch activeLatch) {
373         return new PoolingManagerImpl(host, controller, props, activeLatch);
374     }
375
376     /**
377      * Gets the policy controller associated with a drools controller.
378      *
379      * @param droolsController drools controller
380      * @return the policy controller associated with a drools controller
381      */
382     protected PolicyController getController(DroolsController droolsController) {
383         return PolicyControllerConstants.getFactory().get(droolsController);
384     }
385
386     /**
387      * Initializes the topic sources.
388      *
389      * @param props properties used to configure the topics
390      * @return the topic sources
391      */
392     protected List<TopicSource> initTopicSources(Properties props) {
393         return TopicEndpointManager.getManager().addTopicSources(props);
394     }
395
396     /**
397      * Initializes the topic sinks.
398      *
399      * @param props properties used to configure the topics
400      * @return the topic sinks
401      */
402     protected List<TopicSink> initTopicSinks(Properties props) {
403         return TopicEndpointManager.getManager().addTopicSinks(props);
404     }
405 }