Sonar fixes to pooling
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingFeature.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.util.Properties;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.function.Function;
26 import org.onap.policy.common.utils.properties.exception.PropertyException;
27 import org.onap.policy.drools.controller.DroolsController;
28 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
29 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
30 import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
31 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
32 import org.onap.policy.drools.persistence.SystemPersistence;
33 import org.onap.policy.drools.system.PolicyController;
34 import org.onap.policy.drools.system.PolicyEngine;
35 import org.onap.policy.drools.util.FeatureEnabledChecker;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Controller/session pooling. Multiple hosts may be launched, all servicing the same
41  * controllers/sessions. When this feature is enabled, the requests are divided across the
42  * different hosts, instead of all running on a single, active host.
43  * <p>
44  * With each controller, there is an associated DMaaP topic that is used for internal
45  * communication between the different hosts serving the controller.
46  */
47 public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI {
48
49     private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
50
51     /**
52      * Factory used to create objects.
53      */
54     private static Factory factory;
55
56     /**
57      * Entire set of feature properties, including those specific to various controllers.
58      */
59     private Properties featProps = null;
60
61     /**
62      * Maps a controller name to its associated manager.
63      */
64     private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
65
66     /**
67      * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
68      * called later. As multiple threads can be active within the methods at the same
69      * time, we must keep this in thread local storage.
70      */
71     private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
72
73     /**
74      * 
75      */
76     public PoolingFeature() {
77         super();
78     }
79
80     protected static Factory getFactory() {
81         return factory;
82     }
83
84     /**
85      * Sets the factory to be used to create objects. Used by junit tests.
86      * 
87      * @param factory the new factory to be used to create objects
88      */
89     protected static void setFactory(Factory factory) {
90         PoolingFeature.factory = factory;
91     }
92
93     @Override
94     public int getSequenceNumber() {
95         return 0;
96     }
97
98     @Override
99     public boolean beforeStart(PolicyEngine engine) {
100         logger.info("initializing " + PoolingProperties.FEATURE_NAME);
101         featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
102         return false;
103     }
104
105     /**
106      * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
107      * 
108      * @throws PoolingFeatureRtException if an error occurs
109      */
110     @Override
111     public boolean afterCreate(PolicyController controller) {
112
113         if (featProps == null) {
114             logger.error("pooling feature properties have not been loaded");
115             throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
116         }
117
118         String name = controller.getName();
119
120         if (FeatureEnabledChecker.isFeatureEnabled(featProps, name, PoolingProperties.FEATURE_ENABLED)) {
121             try {
122                 // get & validate the properties
123                 PoolingProperties props = new PoolingProperties(name, featProps);
124
125                 logger.info("pooling enabled for {}", name);
126                 ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
127
128             } catch (PropertyException e) {
129                 logger.error("pooling disabled due to exception for {}", name, e);
130                 throw new PoolingFeatureRtException(e);
131             }
132
133         } else {
134             logger.info("pooling disabled for {}", name);
135         }
136
137
138         return false;
139     }
140
141     @Override
142     public boolean beforeStart(PolicyController controller) {
143         return doManager(controller, mgr -> {
144             mgr.beforeStart();
145             return false;
146         });
147     }
148
149     @Override
150     public boolean afterStart(PolicyController controller) {
151         return doManager(controller, mgr -> {
152             mgr.afterStart();
153             return false;
154         });
155     }
156
157     @Override
158     public boolean beforeStop(PolicyController controller) {
159         return doManager(controller, mgr -> {
160             mgr.beforeStop();
161             return false;
162         });
163     }
164
165     @Override
166     public boolean afterStop(PolicyController controller) {
167
168         // NOTE: using doDeleteManager() instead of doManager()
169
170         return doDeleteManager(controller, mgr -> {
171
172             mgr.afterStop();
173             return false;
174         });
175     }
176
177     @Override
178     public boolean beforeLock(PolicyController controller) {
179         return doManager(controller, mgr -> {
180             mgr.beforeLock();
181             return false;
182         });
183     }
184
185     @Override
186     public boolean afterUnlock(PolicyController controller) {
187         return doManager(controller, mgr -> {
188             mgr.afterUnlock();
189             return false;
190         });
191     }
192
193     @Override
194     public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
195         /*
196          * As this is invoked a lot, we'll directly call the manager's method instead of
197          * using the functional interface via doManager().
198          */
199         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
200         if (mgr == null) {
201             return false;
202         }
203
204         if (mgr.beforeOffer(protocol, topic2, event)) {
205             return true;
206         }
207
208         offerArgs.set(new OfferArgs(protocol, topic2, event));
209         return false;
210     }
211
212     @Override
213     public boolean beforeInsert(DroolsController droolsController, Object fact) {
214
215         OfferArgs args = offerArgs.get();
216         if (args == null) {
217             logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
218             return false;
219         }
220
221         PolicyController controller;
222         try {
223             controller = factory.getController(droolsController);
224
225         } catch (IllegalArgumentException | IllegalStateException e) {
226             logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
227                             droolsController.getArtifactId(), e);
228             return false;
229         }
230
231
232         if (controller == null) {
233             logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
234                             droolsController.getArtifactId());
235             return false;
236         }
237
238         /*
239          * As this is invoked a lot, we'll directly call the manager's method instead of
240          * using the functional interface via doManager().
241          */
242         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
243         if (mgr == null) {
244             return false;
245         }
246
247         return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
248     }
249
250     @Override
251     public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
252                     boolean success) {
253
254         // clear any stored arguments
255         offerArgs.set(null);
256
257         return false;
258     }
259
260     /**
261      * Executes a function using the manager associated with the controller. Catches any
262      * exceptions from the function and re-throws it as a runtime exception.
263      * 
264      * @param controller
265      * @param func function to be executed
266      * @return {@code true} if the function handled the request, {@code false} otherwise
267      * @throws PoolingFeatureRtException if an error occurs
268      */
269     private boolean doManager(PolicyController controller, MgrFunc func) {
270         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
271         if (mgr == null) {
272             return false;
273         }
274
275         try {
276             return func.apply(mgr);
277
278         } catch (PoolingFeatureException e) {
279             throw new PoolingFeatureRtException(e);
280         }
281     }
282
283     /**
284      * Executes a function using the manager associated with the controller and then
285      * deletes the manager. Catches any exceptions from the function and re-throws it as a
286      * runtime exception.
287      * 
288      * @param controller
289      * @param func function to be executed
290      * @return {@code true} if the function handled the request, {@code false} otherwise
291      * @throws PoolingFeatureRtException if an error occurs
292      */
293     private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
294
295         String name = controller.getName();
296         logger.info("remove feature-pool-dmaap manager for {}", name);
297
298         // NOTE: using "remove()" instead of "get()"
299
300         PoolingManagerImpl mgr = ctlr2pool.remove(name);
301
302         if (mgr == null) {
303             return false;
304         }
305
306         return func.apply(mgr);
307     }
308
309     /**
310      * Function that operates on a manager.
311      */
312     @FunctionalInterface
313     private static interface MgrFunc {
314
315         /**
316          * 
317          * @param mgr
318          * @return {@code true} if the request was handled by the manager, {@code false}
319          *         otherwise
320          * @throws PoolingFeatureException
321          */
322         public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
323     }
324
325     /**
326      * Arguments captured from beforeOffer().
327      */
328     private static class OfferArgs {
329
330         /**
331          * Protocol of the receiving topic.
332          */
333         private CommInfrastructure protocol;
334
335         /**
336          * Topic on which the event was received.
337          */
338         private String topic;
339
340         /**
341          * The event text that was received on the topic.
342          */
343         private String event;
344
345         /**
346          * 
347          * @param protocol
348          * @param topic
349          * @param event the actual event data received on the topic
350          */
351         public OfferArgs(CommInfrastructure protocol, String topic, String event) {
352             this.protocol = protocol;
353             this.topic = topic;
354             this.event = event;
355         }
356     }
357
358     /**
359      * Used to create objects.
360      */
361     public static class Factory {
362
363         /**
364          * @param featName feature name
365          * @return the properties for the specified feature
366          */
367         public Properties getProperties(String featName) {
368             return SystemPersistence.manager.getProperties(featName);
369         }
370
371         /**
372          * Makes a pooling manager for a controller.
373          * 
374          * @param controller
375          * @param props properties to use to configure the manager
376          * @return a new pooling manager
377          */
378         public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
379             return new PoolingManagerImpl(controller, props);
380         }
381
382         /**
383          * Gets the policy controller associated with a drools controller.
384          * 
385          * @param droolsController
386          * @return the policy controller associated with a drools controller
387          */
388         public PolicyController getController(DroolsController droolsController) {
389             return PolicyController.factory.get(droolsController);
390         }
391     }
392 }