Removing deprecated DMAAP library
[policy/drools-pdp.git] / policy-core / src / main / java / org / onap / policy / drools / core / PolicySession.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2024 Nordix Foundation.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.drools.core;
24
25 import java.util.concurrent.ConcurrentHashMap;
26 import lombok.Getter;
27 import org.kie.api.event.rule.AfterMatchFiredEvent;
28 import org.kie.api.event.rule.AgendaEventListener;
29 import org.kie.api.event.rule.AgendaGroupPoppedEvent;
30 import org.kie.api.event.rule.AgendaGroupPushedEvent;
31 import org.kie.api.event.rule.BeforeMatchFiredEvent;
32 import org.kie.api.event.rule.MatchCancelledEvent;
33 import org.kie.api.event.rule.MatchCreatedEvent;
34 import org.kie.api.event.rule.ObjectDeletedEvent;
35 import org.kie.api.event.rule.ObjectInsertedEvent;
36 import org.kie.api.event.rule.ObjectUpdatedEvent;
37 import org.kie.api.event.rule.RuleFlowGroupActivatedEvent;
38 import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent;
39 import org.kie.api.event.rule.RuleRuntimeEventListener;
40 import org.kie.api.runtime.KieSession;
41 import org.onap.policy.drools.core.jmx.PdpJmx;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45
46 /**
47  * This class is a wrapper around 'KieSession', which adds the following:
48  *
49  * <p>1) A thread running 'KieSession.fireUntilHalt()'
50  * 2) Access to UEB
51  * 3) Logging of events
52  */
53 public class PolicySession implements AgendaEventListener, RuleRuntimeEventListener {
54     // get an instance of logger
55     private static final Logger logger = LoggerFactory.getLogger(PolicySession.class);
56
57     // supports 'getCurrentSession()' method
58     private static ThreadLocal<PolicySession> policySess = new ThreadLocal<>();
59
60     // name of the 'PolicySession' and associated 'KieSession'
61     @Getter
62     private String name;
63
64     // the associated 'PolicyContainer', which may have additional
65     // 'PolicySession' instances in addition to this one
66     @Getter
67     private PolicyContainer container;
68
69     // maps feature objects to per-PolicyContainer data
70     private ConcurrentHashMap<Object, Object> adjuncts =
71             new ConcurrentHashMap<>();
72
73     // associated 'KieSession' instance
74     @Getter
75     private KieSession kieSession;
76
77     // if not 'null', this is the thread model processing the 'KieSession'
78     private ThreadModel threadModel = null;
79
80     /**
81      * Internal constructor - create a 'PolicySession' instance.
82      *
83      * @param name       the name of this 'PolicySession' (and 'kieSession')
84      * @param container  the 'PolicyContainer' instance containing this session
85      * @param kieSession the associated 'KieSession' instance
86      */
87     protected PolicySession(String name,
88                             PolicyContainer container, KieSession kieSession) {
89         this.name = name;
90         this.container = container;
91         this.kieSession = kieSession;
92         kieSession.addEventListener((AgendaEventListener) this);
93         kieSession.addEventListener((RuleRuntimeEventListener) this);
94     }
95
96     /**
97      * Get full name.
98      *
99      * @return the 'PolicyContainer' name, followed by ':', followed by the
100      *     local name of the session. It should be useful in log messages.
101      */
102     public String getFullName() {
103         return container.getName() + ":" + name;
104     }
105
106     /**
107      * If no 'ThreadModel' is currently running, this method will create one,
108      * and invoke it's 'start()' method. Features implementing
109      * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
110      * the ThreadModel instance.
111      */
112     public synchronized void startThread() {
113         if (threadModel != null) {
114             return;
115         }
116
117         // loop through all the features, and give each one
118         // a chance to create the 'ThreadModel'
119         for (PolicySessionFeatureApi feature :
120                 PolicySessionFeatureApiConstants.getImpl().getList()) {
121             try {
122                 if ((threadModel = feature.selectThreadModel(this)) != null) {
123                     break;
124                 }
125             } catch (Exception e) {
126                 logger.error("ERROR: Feature API: {}", feature.getClass().getName(), e);
127             }
128         }
129         if (threadModel == null) {
130             // no feature created a ThreadModel -- select the default
131             threadModel = new DefaultThreadModel(this);
132         }
133         logger.info("starting ThreadModel for session {}", getFullName());
134         threadModel.start();
135     }
136
137     /**
138      * If a 'ThreadModel' is currently running, this calls the 'stop()' method,
139      * and sets the 'threadModel' reference to 'null'.
140      */
141     public synchronized void stopThread() {
142         if (threadModel != null) {
143             threadModel.stop();
144             threadModel = null;
145         }
146     }
147
148     /**
149      * Notification that 'updateToVersion' was called on the container.
150      */
151     void updated() {
152         if (threadModel != null) {
153             // notify the 'ThreadModel', which may change one or more Thread names
154             threadModel.updated();
155         }
156     }
157
158     /**
159      * Set this 'PolicySession' instance as the one associated with the
160      * currently-running thread.
161      */
162     public void setPolicySession() {
163         // this sets a 'ThreadLocal' variable
164         policySess.set(this);
165     }
166
167     /**
168      * Unset this 'PolicySession' instance as the one associated with the
169      * currently-running thread.
170      */
171     public void removePolicySession() {
172         if (policySess.get() == this) {
173             policySess.remove();
174         }
175     }
176
177     /**
178      * Get current session.
179      *
180      * @return the 'PolicySession' instance associated with the current thread
181      *     (Note that this only works if the current thread is the one running
182      *     'kieSession.fireUntilHalt()'.)
183      */
184     public static PolicySession getCurrentSession() {
185         return policySess.get();
186     }
187
188     /**
189      * Fetch the adjunct object associated with a given feature.
190      *
191      * @param object this is typically the singleton feature object that is
192      *               used as a key, but it might also be useful to use nested objects
193      *               within the feature as keys.
194      * @return a feature-specific object associated with the key, or 'null'
195      *     if it is not found.
196      */
197     public Object getAdjunct(Object object) {
198         return adjuncts.get(object);
199     }
200
201     /**
202      * Store the adjunct object associated with a given feature.
203      *
204      * @param object this is typically the singleton feature object that is
205      *               used as a key, but it might also be useful to use nested objects
206      *               within the feature as keys.
207      * @param value  a feature-specific object associated with the key, or 'null'
208      *               if the feature-specific object should be removed
209      */
210     public void setAdjunct(Object object, Object value) {
211         if (value == null) {
212             adjuncts.remove(object);
213         } else {
214             adjuncts.put(object, value);
215         }
216     }
217
218     /**
219      * This method will insert an object into the Drools memory associated
220      * with this 'PolicySession' instance. Features are given the opportunity
221      * to handle the insert, and a distributed host feature could use this to
222      * send the object to another host, and insert it in the corresponding
223      * Drools session.
224      *
225      * @param object the object to insert in Drools memory
226      */
227     public void insertDrools(Object object) {
228         for (PolicySessionFeatureApi feature :
229                 PolicySessionFeatureApiConstants.getImpl().getList()) {
230             if (feature.insertDrools(this, object)) {
231                 // feature is performing the insert
232                 return;
233             }
234         }
235         // no feature has intervened -- do the insert locally
236         if (kieSession != null) {
237             kieSession.insert(object);
238         }
239     }
240
241     /*=================================*/
242     /* 'AgendaEventListener' interface */
243     /*=================================*/
244
245     /**
246      * {@inheritDoc}.
247      */
248     @Override
249     public void afterMatchFired(AfterMatchFiredEvent event) {
250         logger.debug("afterMatchFired: {}: AgendaEventListener.afterMatchFired({})", getFullName(), event);
251         PdpJmx.getInstance().ruleFired();
252     }
253
254     /**
255      * {@inheritDoc}.
256      */
257     @Override
258     public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) {
259         logger.debug("afterRuleFlowGroupActivated: {}: AgendaEventListener.afterRuleFlowGroupActivated({})",
260                         getFullName(), event);
261     }
262
263     /**
264      * {@inheritDoc}.
265      */
266     @Override
267     public void afterRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event) {
268         logger.debug("afterRuleFlowGroupDeactivated: {}: AgendaEventListener.afterRuleFlowGroupDeactivated({})",
269                         getFullName(), event);
270     }
271
272     /**
273      * {@inheritDoc}.
274      */
275     @Override
276     public void agendaGroupPopped(AgendaGroupPoppedEvent event) {
277         logger.debug("agendaGroupPopped: {}: AgendaEventListener.agendaGroupPopped({})", getFullName(), event);
278     }
279
280     /**
281      * {@inheritDoc}.
282      */
283     @Override
284     public void agendaGroupPushed(AgendaGroupPushedEvent event) {
285         logger.debug("agendaGroupPushed: {}: AgendaEventListener.agendaGroupPushed({})", getFullName(), event);
286     }
287
288     /**
289      * {@inheritDoc}.
290      */
291     @Override
292     public void beforeMatchFired(BeforeMatchFiredEvent event) {
293         logger.debug("beforeMatchFired: {}: AgendaEventListener.beforeMatchFired({})", getFullName(), event);
294     }
295
296     /**
297      * {@inheritDoc}.
298      */
299     @Override
300     public void beforeRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) {
301         logger.debug("beforeRuleFlowGroupActivated: {}: AgendaEventListener.beforeRuleFlowGroupActivated({})",
302                         getFullName(), event);
303     }
304
305     /**
306      * {@inheritDoc}.
307      */
308     @Override
309     public void beforeRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event) {
310         logger.debug("beforeRuleFlowGroupDeactivated: {}: AgendaEventListener.beforeRuleFlowGroupDeactivated({})",
311                         getFullName(), event);
312     }
313
314     /**
315      * {@inheritDoc}.
316      */
317     @Override
318     public void matchCancelled(MatchCancelledEvent event) {
319         logger.debug("matchCancelled: {}: AgendaEventListener.matchCancelled({})", getFullName(), event);
320     }
321
322     /**
323      * {@inheritDoc}.
324      */
325     @Override
326     public void matchCreated(MatchCreatedEvent event) {
327         logger.debug("matchCreated: {}: AgendaEventListener.matchCreated({})", getFullName(), event);
328     }
329
330     /* ====================================== */
331     /* 'RuleRuntimeEventListener' interface */
332     /* ====================================== */
333
334     /**
335      * {@inheritDoc}.
336      */
337     @Override
338     public void objectDeleted(ObjectDeletedEvent event) {
339         logger.debug("objectDeleted: {}: AgendaEventListener.objectDeleted({})", getFullName(), event);
340     }
341
342     /**
343      * {@inheritDoc}.
344      */
345     @Override
346     public void objectInserted(ObjectInsertedEvent event) {
347         logger.debug("objectInserted: {}: AgendaEventListener.objectInserted({})", getFullName(), event);
348     }
349
350     /**
351      * {@inheritDoc}.
352      */
353     @Override
354     public void objectUpdated(ObjectUpdatedEvent event) {
355         logger.debug("objectUpdated: {}: AgendaEventListener.objectUpdated({})", getFullName(), event);
356     }
357
358     /* ============================================================ */
359
360     /**
361      * This interface helps support the ability for features to choose the
362      * thread or threads that processes the 'KieSession'.
363      */
364     public interface ThreadModel {
365         /**
366          * Start the thread or threads that do the 'KieSession' processing.
367          */
368         void start();
369
370         /**
371          * Stop the thread or threads that do the 'KieSession' processing.
372          */
373         void stop();
374
375         /**
376          * This method is called to notify the running session that
377          * 'KieContainer.updateToVersion(...)' has been called (meaning the
378          * full name of this session has changed).
379          */
380         default void updated() {
381         }
382     }
383
384     /* ============================================================ */
385
386     /**
387      * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
388      */
389     public static class DefaultThreadModel implements Runnable, ThreadModel {
390         // session associated with this persistent thread
391         PolicySession session;
392
393         // the session thread
394         Thread thread;
395
396         // controls whether the thread loops or terminates
397         volatile boolean repeat = true;
398
399         /**
400          * Constructor - initialize 'session' and create thread.
401          *
402          * @param session the 'PolicySession' instance
403          */
404         public DefaultThreadModel(PolicySession session) {
405             this.session = session;
406             thread = new Thread(this, getThreadName());
407         }
408
409         /**
410          * Get thread name.
411          *
412          * @return the String to use as the thread name
413          */
414         private String getThreadName() {
415             return "Session " + session.getFullName();
416         }
417
418         /*=========================*/
419         /* 'ThreadModel' interface */
420         /*=========================*/
421
422         /**
423          * {@inheritDoc}.
424          */
425         @Override
426         public void start() {
427             repeat = true;
428             thread.start();
429         }
430
431         /**
432          * {@inheritDoc}.
433          */
434         @Override
435         public void stop() {
436             repeat = false;
437
438             // this should cause the thread to exit
439             session.getKieSession().halt();
440             try {
441                 // wait up to 10 seconds for the thread to stop
442                 thread.join(10000);
443
444                 // one more interrupt, just in case the 'kieSession.halt()'
445                 // didn't work for some reason
446                 thread.interrupt();
447             } catch (InterruptedException e) {
448                 logger.error("stopThread in thread.join error", e);
449                 Thread.currentThread().interrupt();
450             }
451         }
452
453         /**
454          * {@inheritDoc}.
455          */
456         @Override
457         public void updated() {
458             // the container artifact has been updated -- adjust the thread name
459             thread.setName(getThreadName());
460         }
461
462         /*======================*/
463         /* 'Runnable' interface */
464         /*======================*/
465
466         /**
467          * {@inheritDoc}.
468          */
469         @Override
470         public void run() {
471             // set thread local variable
472             session.setPolicySession();
473
474             // We want to continue looping, despite any exceptions that occur
475             // while rules are fired.
476             var kieSession1 = session.getKieSession();
477             while (repeat) {
478                 try {
479                     kieSession1.fireUntilHalt();
480
481                     // if we fall through, it means 'kieSession1.halt()' was called,
482                     // but this may be a result of 'KieScanner' doing an update
483                 } catch (Exception | LinkageError e) {
484                     logger.error("startThread error in kieSession1.fireUntilHalt", e);
485                 }
486             }
487
488             session.removePolicySession();
489             logger.info("fireUntilHalt() returned");
490         }
491     }
492 }