c52a8a97a2b53546937c4f3a55ae233a9f81a057
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingFeature.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.UUID;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CountDownLatch;
28 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
29 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32 import org.onap.policy.common.utils.properties.SpecProperties;
33 import org.onap.policy.common.utils.properties.exception.PropertyException;
34 import org.onap.policy.drools.controller.DroolsController;
35 import org.onap.policy.drools.features.DroolsControllerFeatureApi;
36 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
37 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
38 import org.onap.policy.drools.persistence.SystemPersistence;
39 import org.onap.policy.drools.system.PolicyController;
40 import org.onap.policy.drools.system.PolicyEngine;
41 import org.onap.policy.drools.util.FeatureEnabledChecker;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Controller/session pooling. Multiple hosts may be launched, all servicing the same
47  * controllers/sessions. When this feature is enabled, the requests are divided across the different
48  * hosts, instead of all running on a single, active host.
49  *
50  * <p>With each controller, there is an
51  * associated DMaaP topic that is used for internal communication between the different hosts
52  * serving the controller.
53  */
54 public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
55
56     private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
57
58     /**
59      * ID of this host.
60      */
61     private final String host;
62
63     /**
64      * Entire set of feature properties, including those specific to various controllers.
65      */
66     private Properties featProps = null;
67
68     /**
69      * Maps a controller name to its associated manager.
70      */
71     private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
72
73     /**
74      * Decremented each time a manager enters the Active state. Used by junit tests.
75      */
76     private final CountDownLatch activeLatch = new CountDownLatch(1);
77
78     /**
79      * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
80      * later. As multiple threads can be active within the methods at the same time, we must keep
81      * this in thread local storage.
82      */
83     private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
84
85     /**
86      * Constructor.
87      */
88     public PoolingFeature() {
89         super();
90
91         this.host = UUID.randomUUID().toString();
92     }
93
94     public String getHost() {
95         return host;
96     }
97
98     /**
99      * Get active latch.
100      *
101      * @return a latch that will be decremented when a manager enters the active state
102      */
103     protected CountDownLatch getActiveLatch() {
104         return activeLatch;
105     }
106
107     @Override
108     public int getSequenceNumber() {
109         return 0;
110     }
111
112     @Override
113     public boolean beforeStart(PolicyEngine engine) {
114         logger.info("initializing {}", PoolingProperties.FEATURE_NAME);
115         featProps = getProperties(PoolingProperties.FEATURE_NAME);
116
117         // remove any generic pooling topic - always use controller-specific property
118         featProps.remove(PoolingProperties.POOLING_TOPIC);
119
120         initTopicSources(featProps);
121         initTopicSinks(featProps);
122
123         return false;
124     }
125
126     @Override
127     public boolean beforeStart(PolicyController controller) {
128         return doManager(controller, mgr -> {
129             mgr.beforeStart();
130             return false;
131         });
132     }
133
134     /**
135      * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
136      *
137      * @throws PoolingFeatureRtException if an error occurs
138      */
139     @Override
140     public boolean afterCreate(PolicyController controller) {
141
142         if (featProps == null) {
143             logger.error("pooling feature properties have not been loaded");
144             throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
145         }
146
147         String name = controller.getName();
148
149         SpecProperties specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
150
151         if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
152             try {
153                 // get & validate the properties
154                 PoolingProperties props = new PoolingProperties(name, featProps);
155
156                 logger.info("pooling enabled for {}", name);
157                 ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch));
158
159             } catch (PropertyException e) {
160                 logger.error("pooling disabled due to exception for {}", name, e);
161                 throw new PoolingFeatureRtException(e);
162             }
163
164         } else {
165             logger.info("pooling disabled for {}", name);
166         }
167
168
169         return false;
170     }
171
172     @Override
173     public boolean afterStart(PolicyController controller) {
174         return doManager(controller, mgr -> {
175             mgr.afterStart();
176             return false;
177         });
178     }
179
180     @Override
181     public boolean beforeStop(PolicyController controller) {
182         return doManager(controller, mgr -> {
183             mgr.beforeStop();
184             return false;
185         });
186     }
187
188     @Override
189     public boolean afterStop(PolicyController controller) {
190         return doManager(controller, mgr -> {
191             mgr.afterStop();
192             return false;
193         });
194     }
195
196     @Override
197     public boolean afterShutdown(PolicyController controller) {
198         deleteManager(controller);
199         return false;
200     }
201
202     @Override
203     public boolean afterHalt(PolicyController controller) {
204         deleteManager(controller);
205         return false;
206     }
207
208     @Override
209     public boolean beforeLock(PolicyController controller) {
210         return doManager(controller, mgr -> {
211             mgr.beforeLock();
212             return false;
213         });
214     }
215
216     @Override
217     public boolean afterUnlock(PolicyController controller) {
218         return doManager(controller, mgr -> {
219             mgr.afterUnlock();
220             return false;
221         });
222     }
223
224     @Override
225     public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
226         /*
227          * As this is invoked a lot, we'll directly call the manager's method instead of using the
228          * functional interface via doManager().
229          */
230         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
231         if (mgr == null) {
232             return false;
233         }
234
235         if (mgr.beforeOffer(protocol, topic2, event)) {
236             return true;
237         }
238
239         offerArgs.set(new OfferArgs(protocol, topic2, event));
240         return false;
241     }
242
243     @Override
244     public boolean beforeInsert(DroolsController droolsController, Object fact) {
245
246         OfferArgs args = offerArgs.get();
247         if (args == null) {
248             logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
249             return false;
250         }
251
252         PolicyController controller;
253         try {
254             controller = getController(droolsController);
255
256         } catch (IllegalArgumentException | IllegalStateException e) {
257             logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
258                     droolsController.getArtifactId(), e);
259             return false;
260         }
261
262
263         if (controller == null) {
264             logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
265                     droolsController.getArtifactId());
266             return false;
267         }
268
269         /*
270          * As this is invoked a lot, we'll directly call the manager's method instead of using the
271          * functional interface via doManager().
272          */
273         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
274         if (mgr == null) {
275             return false;
276         }
277
278         return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
279     }
280
281     @Override
282     public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
283             boolean success) {
284
285         // clear any stored arguments
286         offerArgs.set(null);
287
288         return false;
289     }
290
291     /**
292      * Executes a function using the manager associated with the controller. Catches any exceptions
293      * from the function and re-throws it as a runtime exception.
294      *
295      * @param controller controller
296      * @param func function to be executed
297      * @return {@code true} if the function handled the request, {@code false} otherwise
298      * @throws PoolingFeatureRtException if an error occurs
299      */
300     private boolean doManager(PolicyController controller, MgrFunc func) {
301         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
302         if (mgr == null) {
303             return false;
304         }
305
306         try {
307             return func.apply(mgr);
308
309         } catch (PoolingFeatureException e) {
310             throw new PoolingFeatureRtException(e);
311         }
312     }
313
314     /**
315      * Deletes the manager associated with a controller.
316      *
317      * @param controller controller
318      * @throws PoolingFeatureRtException if an error occurs
319      */
320     private void deleteManager(PolicyController controller) {
321
322         String name = controller.getName();
323         logger.info("remove feature-pool-dmaap manager for {}", name);
324
325         ctlr2pool.remove(name);
326     }
327
328     /**
329      * Function that operates on a manager.
330      */
331     @FunctionalInterface
332     private static interface MgrFunc {
333
334         /**
335          * Apply.
336          *
337          * @param mgr manager
338          * @return {@code true} if the request was handled by the manager, {@code false} otherwise
339          * @throws PoolingFeatureException feature exception
340          */
341         public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
342     }
343
344     /**
345      * Arguments captured from beforeOffer().
346      */
347     private static class OfferArgs {
348
349         /**
350          * Protocol of the receiving topic.
351          */
352         private CommInfrastructure protocol;
353
354         /**
355          * Topic on which the event was received.
356          */
357         private String topic;
358
359         /**
360          * The event text that was received on the topic.
361          */
362         private String event;
363
364         /**
365          * Constructor.
366          *
367          * @param protocol protocol
368          * @param topic topic
369          * @param event the actual event data received on the topic
370          */
371         public OfferArgs(CommInfrastructure protocol, String topic, String event) {
372             this.protocol = protocol;
373             this.topic = topic;
374             this.event = event;
375         }
376     }
377
378     /*
379      * The remaining methods may be overridden by junit tests.
380      */
381
382     /**
383      * Get properties.
384      *
385      * @param featName feature name
386      * @return the properties for the specified feature
387      */
388     protected Properties getProperties(String featName) {
389         return SystemPersistence.manager.getProperties(featName);
390     }
391
392     /**
393      * Makes a pooling manager for a controller.
394      *
395      * @param host name/uuid of this host
396      * @param controller controller
397      * @param props properties to use to configure the manager
398      * @param activeLatch decremented when the manager goes Active
399      * @return a new pooling manager
400      */
401     protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
402             CountDownLatch activeLatch) {
403         return new PoolingManagerImpl(host, controller, props, activeLatch);
404     }
405
406     /**
407      * Gets the policy controller associated with a drools controller.
408      *
409      * @param droolsController drools controller
410      * @return the policy controller associated with a drools controller
411      */
412     protected PolicyController getController(DroolsController droolsController) {
413         return PolicyController.factory.get(droolsController);
414     }
415
416     /**
417      * Initializes the topic sources.
418      *
419      * @param props properties used to configure the topics
420      * @return the topic sources
421      */
422     protected List<TopicSource> initTopicSources(Properties props) {
423         return TopicEndpointManager.getManager().addTopicSources(props);
424     }
425
426     /**
427      * Initializes the topic sinks.
428      *
429      * @param props properties used to configure the topics
430      * @return the topic sinks
431      */
432     protected List<TopicSink> initTopicSinks(Properties props) {
433         return TopicEndpointManager.getManager().addTopicSinks(props);
434     }
435 }