[POLICY-75] Problems with KieScanner
[policy/drools-pdp.git] / policy-core / src / main / java / org / openecomp / policy / drools / core / PolicySession.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-core
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.core;
22
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import org.kie.api.event.rule.AfterMatchFiredEvent;
26 import org.kie.api.event.rule.AgendaEventListener;
27 import org.kie.api.event.rule.AgendaGroupPoppedEvent;
28 import org.kie.api.event.rule.AgendaGroupPushedEvent;
29 import org.kie.api.event.rule.BeforeMatchFiredEvent;
30 import org.kie.api.event.rule.MatchCancelledEvent;
31 import org.kie.api.event.rule.MatchCreatedEvent;
32 import org.kie.api.event.rule.ObjectDeletedEvent;
33 import org.kie.api.event.rule.ObjectInsertedEvent;
34 import org.kie.api.event.rule.ObjectUpdatedEvent;
35 import org.kie.api.event.rule.RuleFlowGroupActivatedEvent;
36 import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent;
37 import org.kie.api.event.rule.RuleRuntimeEventListener;
38 import org.kie.api.runtime.KieSession;
39 import org.openecomp.policy.drools.core.jmx.PdpJmx;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43
44 /**
45  * This class is a wrapper around 'KieSession', which adds the following:
46  *
47  * 1) A thread running 'KieSession.fireUntilHalt()'
48  * 2) Access to UEB
49  * 3) Logging of events
50  */
51 public class PolicySession
52   implements AgendaEventListener, RuleRuntimeEventListener
53 {
54         // get an instance of logger 
55   private static Logger  logger = LoggerFactory.getLogger(PolicySession.class);         
56   // name of the 'PolicySession' and associated 'KieSession'
57   private String name;
58
59   // the associated 'PolicyContainer', which may have additional
60   // 'PolicySession' instances in addition to this one
61   private PolicyContainer container;
62
63   // maps feature objects to per-PolicyContainer data
64   private ConcurrentHashMap<Object, Object> adjuncts =
65         new ConcurrentHashMap<Object, Object>();
66
67   // associated 'KieSession' instance
68   private KieSession kieSession;
69
70   // if not 'null', this is the thread model processing the 'KieSession'
71   private ThreadModel threadModel = null;
72
73   // supports 'getCurrentSession()' method
74   static private ThreadLocal<PolicySession> policySession =
75         new ThreadLocal<PolicySession>();
76
77   /**
78    * Internal constructor - create a 'PolicySession' instance
79    *
80    * @param name the name of this 'PolicySession' (and 'kieSession')
81    * @param container the 'PolicyContainer' instance containing this session
82    * @param kieSession the associated 'KieSession' instance
83    */
84   protected PolicySession(String name,
85                                                   PolicyContainer container, KieSession kieSession)
86   {
87         this.name = name;
88         this.container = container;
89         this.kieSession = kieSession;
90         kieSession.addEventListener((AgendaEventListener)this);
91         kieSession.addEventListener((RuleRuntimeEventListener)this);
92   }
93
94   /**
95    * @return the 'PolicyContainer' object containing this session
96    */
97   public PolicyContainer getPolicyContainer()
98   {
99         return(container);
100   }
101
102   /**
103    * @return the associated 'KieSession' instance
104    */
105   public KieSession getKieSession()
106   {
107         return(kieSession);
108   }
109
110   /**
111    * @return the local name of this session, which should either match the
112    * name specified in 'kmodule.xml' file associated with this session, or the
113    * name passed on the 'PolicyContainer.adoptKieSession' method.
114    */
115   public String getName()
116   {
117         return(name);
118   }
119
120   /**
121    * @return the 'PolicyContainer' name, followed by ':', followed by the
122    * local name of the session. It should be useful in log messages.
123    */
124   public String getFullName()
125   {
126         return(container.getName() + ":" + name);
127   }
128
129   /**
130    * If no 'ThreadModel' is currently running, this method will create one,
131    * and invoke it's 'start()' method. Features implementing
132    * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
133    * the ThreadModel instance.
134    */
135   public synchronized void startThread()
136   {
137         if (threadModel == null)
138           {
139                 // loop through all of the features, and give each one
140                 // a chance to create the 'ThreadModel'
141                 for (PolicySessionFeatureAPI feature :
142                            PolicySessionFeatureAPI.impl.getList())
143                   {
144                         try
145                           {
146                                 if ((threadModel = feature.selectThreadModel(this)) != null)
147                                   break;
148                           }
149                         catch (Exception e)
150                           {
151                                 logger.error("ERROR: Feature API: "
152                                                          + feature.getClass().getName(), e);
153                           }
154                   }             
155                 if (threadModel == null)
156                   {
157                         // no feature created a ThreadModel -- select the default
158                         threadModel = new DefaultThreadModel(this);
159                   }
160                 logger.info("starting ThreadModel for session " + getFullName());
161                 threadModel.start();
162           }
163   }
164
165   /**
166    * If a 'ThreadModel' is currently running, this calls the 'stop()' method,
167    * and sets the 'threadModel' reference to 'null'.
168    */
169   public synchronized void stopThread()
170   {
171         if (threadModel != null)
172           {
173                 threadModel.stop();
174                 threadModel = null;
175           }
176   }
177
178   /**
179    * Notification that 'updateToVersion' was called on the container
180    */
181   void updated()
182   {
183         if (threadModel != null)
184           {
185                 // notify the 'ThreadModel', which may change one or more Thread names
186                 threadModel.updated();
187           }
188   }
189
190   /**
191    * Set this 'PolicySession' instance as the one associated with the
192    * currently-running thread.
193    */
194   public void setPolicySession()
195   {
196         // this sets a 'ThreadLocal' variable
197         policySession.set(this);
198   }
199
200   /**
201    * @return the 'PolicySession' instance associated with the current thread
202    *    (Note that this only works if the current thread is the one running
203    *    'kieSession.fireUntilHalt()'.)
204    */
205   public static PolicySession getCurrentSession()
206   {
207         return(policySession.get());
208   }
209         
210   /**
211    * Fetch the adjunct object associated with a given feature
212    *
213    * @param object this is typically the singleton feature object that is
214    *    used as a key, but it might also be useful to use nested objects
215    *    within the feature as keys.
216    * @return a feature-specific object associated with the key, or 'null'
217    *    if it is not found.
218    */
219   public Object getAdjunct(Object object)
220   {
221         return(adjuncts.get(object));
222   }
223
224   /**
225    * Store the adjunct object associated with a given feature
226    *
227    * @param object this is typically the singleton feature object that is
228    *    used as a key, but it might also be useful to use nested objects
229    *    within the feature as keys.
230    * @param value a feature-specific object associated with the key, or 'null'
231    *    if the feature-specific object should be removed
232    */
233   public void setAdjunct(Object object, Object value)
234   {
235         if (value == null)
236           {
237                 adjuncts.remove(object);
238           }
239         else
240           {
241                 adjuncts.put(object, value);
242           }
243   }
244
245   /***********************************/
246   /* 'AgendaEventListener' interface */
247   /***********************************/
248
249   /**
250    * {@inheritDoc}
251    */
252   @Override
253         public void afterMatchFired(AfterMatchFiredEvent event)
254   {
255         if (logger.isDebugEnabled())
256           {
257                 logger.debug("afterMatchFired: " + getFullName()
258                                          + ": AgendaEventListener.afterMatchFired(" + event + ")");
259           }
260         PdpJmx.getInstance().ruleFired();
261  }
262
263   /**
264    * {@inheritDoc}
265    */
266   @Override
267         public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event)
268   {
269         if (logger.isDebugEnabled())
270           {
271         logger.debug("afterRuleFlowGroupActivated: " + getFullName()
272                                  + ": AgendaEventListener.afterRuleFlowGroupActivated("
273                                  + event + ")");
274           }
275   }
276
277   /**
278    * {@inheritDoc}
279    */
280   @Override
281         public void afterRuleFlowGroupDeactivated
282         (RuleFlowGroupDeactivatedEvent event)
283   {
284         if (logger.isDebugEnabled())
285           {
286                 logger.debug("afterRuleFlowGroupDeactivated: " + getFullName()
287                                          + ": AgendaEventListener.afterRuleFlowGroupDeactivated("
288                                          + event + ")");
289           }
290   }
291
292   /**
293    * {@inheritDoc}
294    */
295   @Override
296         public void agendaGroupPopped(AgendaGroupPoppedEvent event)
297   {
298         if (logger.isDebugEnabled())
299           {
300                 logger.debug("agendaGroupPopped: " + getFullName()
301                                          + ": AgendaEventListener.agendaGroupPopped("
302                                          + event + ")");
303           }
304   }
305
306   /**
307    * {@inheritDoc}
308    */
309   @Override
310         public void agendaGroupPushed(AgendaGroupPushedEvent event)
311   {
312         if (logger.isDebugEnabled())
313           {
314                 logger.debug("agendaGroupPushed: " + getFullName()
315                                          + ": AgendaEventListener.agendaGroupPushed("
316                                          + event + ")");
317           }
318   }
319
320   /**
321    * {@inheritDoc}
322    */
323   @Override
324         public void beforeMatchFired(BeforeMatchFiredEvent event)
325   {
326         if (logger.isDebugEnabled())
327           {
328                 logger.debug("beforeMatchFired: " + getFullName()
329                                          + ": AgendaEventListener.beforeMatchFired("
330                                          + event + ")");
331           }
332   }
333
334   /**
335    * {@inheritDoc}
336    */
337   @Override
338         public void beforeRuleFlowGroupActivated
339         (RuleFlowGroupActivatedEvent event)
340   {
341         if (logger.isDebugEnabled())
342           {
343                 logger.debug("beforeRuleFlowGroupActivated: " + getFullName()
344                                          + ": AgendaEventListener.beforeRuleFlowGroupActivated("
345                                          + event + ")");
346           }
347   }
348
349   /**
350    * {@inheritDoc}
351    */
352   @Override
353         public void beforeRuleFlowGroupDeactivated
354         (RuleFlowGroupDeactivatedEvent event)
355   {
356         if (logger.isDebugEnabled())
357           {
358                 logger.debug("beforeRuleFlowGroupDeactivated: " + getFullName()
359                                          + ": AgendaEventListener.beforeRuleFlowGroupDeactivated("
360                                          + event + ")");
361           }
362   }
363
364   /**
365    * {@inheritDoc}
366    */
367   @Override
368         public void matchCancelled(MatchCancelledEvent event)
369   {
370         if (logger.isDebugEnabled())
371           {
372                 logger.debug("matchCancelled: " + getFullName()
373                                          + ": AgendaEventListener.matchCancelled(" + event + ")");
374           }
375   }
376
377   /**
378    * {@inheritDoc}
379    */
380   @Override
381         public void matchCreated(MatchCreatedEvent event)
382   {
383         if (logger.isDebugEnabled())
384           {
385                 logger.debug("matchCreated: " + getFullName()
386                                          + ": AgendaEventListener.matchCreated(" + event + ")");
387           }
388   }
389
390   /****************************************/
391   /* 'RuleRuntimeEventListener' interface */
392   /****************************************/
393
394   /**
395    * {@inheritDoc}
396    */
397   @Override
398         public void objectDeleted(ObjectDeletedEvent event)
399   {
400         if (logger.isDebugEnabled())
401           {
402                 logger.debug("objectDeleted: " + getFullName()
403                                          + ": AgendaEventListener.objectDeleted(" + event + ")");
404           }
405   }
406
407   /**
408    * {@inheritDoc}
409    */
410   @Override
411         public void objectInserted(ObjectInsertedEvent event)
412   {
413         if (logger.isDebugEnabled())
414           {
415                 logger.debug("objectInserted: " + getFullName()
416                                          + ": AgendaEventListener.objectInserted(" + event + ")");
417           }
418   }
419
420   /**
421    * {@inheritDoc}
422    */
423   @Override
424         public void objectUpdated(ObjectUpdatedEvent event)
425   {
426         if (logger.isDebugEnabled())
427           {
428                 logger.debug("objectUpdated: " + getFullName()
429                                          + ": AgendaEventListener.objectUpdated(" + event + ")");
430           }
431   }
432
433   /* ============================================================ */
434
435   /**
436    * This interface helps support the ability for features to choose the
437    * thread or threads that processes the 'KieSession'.
438    */
439   public interface ThreadModel
440   {
441         /**
442          * Start the thread or threads that do the 'KieSession' processing
443          */
444         public void start();
445
446         /**
447          * Stop the thread or threads that do the 'KieSession' processing
448          */
449         public void stop();
450
451         /**
452          * This method is called to notify the running session that
453          * 'KieContainer.updateToVersion(...)' has been called (meaning the
454          * full name of this session has changed).
455          */
456         default public void updated() {}
457   }
458
459   /* ============================================================ */
460
461   /**
462    * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
463    */
464   public static class DefaultThreadModel implements Runnable,ThreadModel
465   {
466         // session associated with this persistent thread
467         PolicySession session;
468
469         // the session thread
470         Thread thread;
471
472         // controls whether the thread loops or terminates
473         volatile boolean repeat = true;
474
475         /**
476          * Constructor - initialize 'session' and create thread
477          *
478          * @param session the 'PolicySession' instance
479          */
480         public DefaultThreadModel(PolicySession session)
481         {
482           this.session = session;
483           thread = new Thread(this,getThreadName());
484         }
485
486         /**
487          * @return the String to use as the thread name
488          */
489         private String getThreadName()
490         {
491           return("Session " + session.getFullName());
492         }
493
494         /***************************/
495         /* 'ThreadModel' interface */
496         /***************************/
497
498         /**
499          * {@inheritDoc}
500          */
501         @Override
502           public void start()
503         {
504           repeat = true;
505           thread.start();
506         }
507
508         /**
509          * {@inheritDoc}
510          */
511         @Override
512           public void stop()
513         {
514           repeat = false;
515
516           // this should cause the thread to exit               
517           session.getKieSession().halt();
518           try
519                 {
520                   // wait up to 10 seconds for the thread to stop
521                   thread.join(10000);
522
523                   // one more interrupt, just in case the 'kieSession.halt()'
524                   // didn't work for some reason
525                   thread.interrupt();
526                 }
527           catch (Exception e)
528                 {
529                   logger.error("stopThread in thread.join error");
530                 }
531         }
532
533         /**
534          * {@inheritDoc}
535          */
536         @Override
537           public void updated()
538         {
539           // the container artifact has been updated -- adjust the thread name
540           thread.setName(getThreadName());
541         }
542
543         /************************/
544         /* 'Runnable' interface */
545         /************************/
546
547         /**
548          * {@inheritDoc}
549          */
550         @Override
551           public void run()
552         {
553           // set thread local variable
554           session.setPolicySession();
555
556           // We want to continue looping, despite any exceptions that occur
557           // while rules are fired.
558           KieSession kieSession = session.getKieSession();
559           while (repeat)
560                 {
561                   try
562                         {
563                           kieSession.fireUntilHalt();
564
565                           // if we fall through, it means 'KieSession.halt()' was called,
566                           // but this may be a result of 'KieScanner' doing an update
567                         }
568                   catch (Throwable e)
569                         {
570                           logger.error("startThread error in kieSession.fireUntilHalt", e);                                             
571                         }
572                 }
573           logger.info("fireUntilHalt() returned");
574         }
575   }
576 }