Merge "Add api-resource-locks feature"
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingFeature.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.io.IOException;
24 import java.util.Properties;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.function.Function;
27 import org.onap.policy.common.utils.properties.exception.PropertyException;
28 import org.onap.policy.drools.controller.DroolsController;
29 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
30 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
32 import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
33 import org.onap.policy.drools.system.PolicyController;
34 import org.onap.policy.drools.utils.PropertyUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Controller/session pooling. Multiple hosts may be launched, all servicing the
40  * same controllers/sessions. When this feature is enabled, the requests are
41  * divided across the different hosts, instead of all running on a single,
42  * active host.
43  * <p>
44  * With each controller, there is an associated DMaaP topic that is used for
45  * internal communication between the different hosts serving the controller.
46  */
47 public class PoolingFeature implements PolicyControllerFeatureAPI, DroolsControllerFeatureAPI, PolicySessionFeatureAPI {
48
49     private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class);
50
51     // TODO state-management doesn't allow more than one active host at a time
52
53     /**
54      * Factory used to create objects.
55      */
56     private static Factory factory;
57
58     /**
59      * Entire set of feature properties, including those specific to various
60      * controllers.
61      */
62     private Properties featProps = null;
63
64     /**
65      * Maps a controller name to its associated manager.
66      */
67     private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
68
69     /**
70      * Arguments passed to beforeOffer(), which are saved for when the
71      * beforeInsert() is called later. As multiple threads can be active within
72      * the methods at the same time, we must keep this in thread local storage.
73      */
74     private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
75
76     /**
77      * 
78      */
79     public PoolingFeature() {
80         super();
81     }
82
83     protected static Factory getFactory() {
84         return factory;
85     }
86
87     /**
88      * Sets the factory to be used to create objects. Used by junit tests.
89      * 
90      * @param factory the new factory to be used to create objects
91      */
92     protected static void setFactory(Factory factory) {
93         PoolingFeature.factory = factory;
94     }
95
96     @Override
97     public int getSequenceNumber() {
98         return 0;
99     }
100
101     /**
102      * @throws PoolingFeatureRtException if the properties cannot be read or are
103      *         invalid
104      */
105     @Override
106     public void globalInit(String[] args, String configDir) {
107         logger.info("initializing pooling feature");
108
109         try {
110             featProps = PropertyUtil.getProperties(configDir + "/feature-pooling-dmaap.properties");
111
112         } catch (IOException ex) {
113             throw new PoolingFeatureRtException(ex);
114         }
115     }
116
117     /**
118      * Adds the controller and a new pooling manager to {@link #ctlr2pool}.
119      * 
120      * @throws PoolingFeatureRtException if an error occurs
121      */
122     @Override
123     public boolean afterCreate(PolicyController controller) {
124
125         if (featProps == null) {
126             logger.error("pooling feature properties have not been loaded");
127             throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties"));
128         }
129
130         String name = controller.getName();
131
132         if (FeatureEnabledChecker.isFeatureEnabled(featProps, name, PoolingProperties.FEATURE_ENABLED)) {
133             try {
134                 // get & validate the properties
135                 PoolingProperties props = new PoolingProperties(name, featProps);
136
137                 logger.info("pooling enabled for {}", name);
138                 ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
139
140             } catch (PropertyException e) {
141                 logger.error("pooling disabled due to exception for {}", name, e);
142                 throw new PoolingFeatureRtException(e);
143             }
144
145         } else {
146             logger.info("pooling disabled for {}", name);
147         }
148
149
150         return false;
151     }
152
153     @Override
154     public boolean beforeStart(PolicyController controller) {
155         return doManager(controller, mgr -> {
156             mgr.beforeStart();
157             return false;
158         });
159     }
160
161     @Override
162     public boolean afterStart(PolicyController controller) {
163         return doManager(controller, mgr -> {
164             mgr.afterStart();
165             return false;
166         });
167     }
168
169     @Override
170     public boolean beforeStop(PolicyController controller) {
171         return doManager(controller, mgr -> {
172             mgr.beforeStop();
173             return false;
174         });
175     }
176
177     @Override
178     public boolean afterStop(PolicyController controller) {
179
180         // NOTE: using doDeleteManager() instead of doManager()
181
182         return doDeleteManager(controller, mgr -> {
183
184             mgr.afterStop();
185             return false;
186         });
187     }
188
189     @Override
190     public boolean beforeLock(PolicyController controller) {
191         return doManager(controller, mgr -> {
192             mgr.beforeLock();
193             return false;
194         });
195     }
196
197     @Override
198     public boolean afterUnlock(PolicyController controller) {
199         return doManager(controller, mgr -> {
200             mgr.afterUnlock();
201             return false;
202         });
203     }
204
205     @Override
206     public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
207         /*
208          * As this is invoked a lot, we'll directly call the manager's method
209          * instead of using the functional interface via doManager().
210          */
211         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
212         if (mgr == null) {
213             return false;
214         }
215
216         if (mgr.beforeOffer(protocol, topic2, event)) {
217             return true;
218         }
219
220         offerArgs.set(new OfferArgs(protocol, topic2, event));
221         return false;
222     }
223
224     @Override
225     public boolean beforeInsert(DroolsController droolsController, Object fact) {
226
227         OfferArgs args = offerArgs.get();
228         if (args == null) {
229             return false;
230         }
231
232         PolicyController controller;
233         try {
234             controller = factory.getController(droolsController);
235
236         } catch (IllegalArgumentException | IllegalStateException e) {
237             return false;
238         }
239
240         if (controller == null) {
241             return false;
242         }
243
244         /*
245          * As this is invoked a lot, we'll directly call the manager's method
246          * instead of using the functional interface via doManager().
247          */
248         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
249         if (mgr == null) {
250             return false;
251         }
252
253         return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
254     }
255
256     @Override
257     public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
258                     boolean success) {
259
260         // clear any stored arguments
261         offerArgs.set(null);
262
263         return false;
264     }
265
266     /**
267      * Executes a function using the manager associated with the controller.
268      * Catches any exceptions from the function and re-throws it as a runtime
269      * exception.
270      * 
271      * @param controller
272      * @param func function to be executed
273      * @return {@code true} if the function handled the request, {@code false}
274      *         otherwise
275      * @throws PoolingFeatureRtException if an error occurs
276      */
277     private boolean doManager(PolicyController controller, MgrFunc func) {
278         PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
279         if (mgr == null) {
280             return false;
281         }
282
283         try {
284             return func.apply(mgr);
285
286         } catch (PoolingFeatureException e) {
287             throw e.toRuntimeException();
288         }
289     }
290
291     /**
292      * Executes a function using the manager associated with the controller and
293      * then deletes the manager. Catches any exceptions from the function and
294      * re-throws it as a runtime exception.
295      * 
296      * @param controller
297      * @param func function to be executed
298      * @return {@code true} if the function handled the request, {@code false}
299      *         otherwise
300      * @throws PoolingFeatureRtException if an error occurs
301      */
302     private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
303
304         // NOTE: using "remove()" instead of "get()"
305
306         PoolingManagerImpl mgr = ctlr2pool.remove(controller.getName());
307
308         if (mgr == null) {
309             return false;
310         }
311
312         return func.apply(mgr);
313     }
314
315     /**
316      * Function that operates on a manager.
317      */
318     @FunctionalInterface
319     private static interface MgrFunc {
320
321         /**
322          * 
323          * @param mgr
324          * @return {@code true} if the request was handled by the manager,
325          *         {@code false} otherwise
326          * @throws PoolingFeatureException
327          */
328         public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
329     }
330
331     /**
332      * Arguments captured from beforeOffer().
333      */
334     private static class OfferArgs {
335
336         /**
337          * Protocol of the receiving topic.
338          */
339         private CommInfrastructure protocol;
340
341         /**
342          * Topic on which the event was received.
343          */
344         private String topic;
345
346         /**
347          * The event text that was received on the topic.
348          */
349         private String event;
350
351         /**
352          * 
353          * @param protocol
354          * @param topic
355          * @param event the actual event data received on the topic
356          */
357         public OfferArgs(CommInfrastructure protocol, String topic, String event) {
358             this.protocol = protocol;
359             this.topic = topic;
360             this.event = event;
361         }
362     }
363
364     /**
365      * Used to create objects.
366      */
367     public static class Factory {
368
369         /**
370          * Makes a pooling manager for a controller.
371          * 
372          * @param controller
373          * @param props properties to use to configure the manager
374          * @return a new pooling manager
375          */
376         public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
377             return new PoolingManagerImpl(controller, props);
378         }
379
380         /**
381          * Gets the policy controller associated with a drools controller.
382          * 
383          * @param droolsController
384          * @return the policy controller associated with a drools controller
385          */
386         public PolicyController getController(DroolsController droolsController) {
387             return PolicyController.factory.get(droolsController);
388         }
389     }
390 }