2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.core;
23 import java.util.concurrent.ConcurrentHashMap;
25 import org.kie.api.event.rule.AfterMatchFiredEvent;
26 import org.kie.api.event.rule.AgendaEventListener;
27 import org.kie.api.event.rule.AgendaGroupPoppedEvent;
28 import org.kie.api.event.rule.AgendaGroupPushedEvent;
29 import org.kie.api.event.rule.BeforeMatchFiredEvent;
30 import org.kie.api.event.rule.MatchCancelledEvent;
31 import org.kie.api.event.rule.MatchCreatedEvent;
32 import org.kie.api.event.rule.ObjectDeletedEvent;
33 import org.kie.api.event.rule.ObjectInsertedEvent;
34 import org.kie.api.event.rule.ObjectUpdatedEvent;
35 import org.kie.api.event.rule.RuleFlowGroupActivatedEvent;
36 import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent;
37 import org.kie.api.event.rule.RuleRuntimeEventListener;
38 import org.kie.api.runtime.KieSession;
39 import org.openecomp.policy.drools.core.jmx.PdpJmx;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
45 * This class is a wrapper around 'KieSession', which adds the following:
47 * 1) A thread running 'KieSession.fireUntilHalt()'
49 * 3) Logging of events
51 public class PolicySession
52 implements AgendaEventListener, RuleRuntimeEventListener
54 // get an instance of logger
55 private static Logger logger = LoggerFactory.getLogger(PolicySession.class);
56 // name of the 'PolicySession' and associated 'KieSession'
59 // the associated 'PolicyContainer', which may have additional
60 // 'PolicySession' instances in addition to this one
61 private PolicyContainer container;
63 // maps feature objects to per-PolicyContainer data
64 private ConcurrentHashMap<Object, Object> adjuncts =
65 new ConcurrentHashMap<Object, Object>();
67 // associated 'KieSession' instance
68 private KieSession kieSession;
70 // if not 'null', this is the thread model processing the 'KieSession'
71 private ThreadModel threadModel = null;
73 // supports 'getCurrentSession()' method
74 static private ThreadLocal<PolicySession> policySession =
75 new ThreadLocal<PolicySession>();
78 * Internal constructor - create a 'PolicySession' instance
80 * @param name the name of this 'PolicySession' (and 'kieSession')
81 * @param container the 'PolicyContainer' instance containing this session
82 * @param kieSession the associated 'KieSession' instance
84 protected PolicySession(String name,
85 PolicyContainer container, KieSession kieSession)
88 this.container = container;
89 this.kieSession = kieSession;
90 kieSession.addEventListener((AgendaEventListener)this);
91 kieSession.addEventListener((RuleRuntimeEventListener)this);
95 * @return the 'PolicyContainer' object containing this session
97 public PolicyContainer getPolicyContainer()
103 * @return the associated 'KieSession' instance
105 public KieSession getKieSession()
111 * @return the local name of this session, which should either match the
112 * name specified in 'kmodule.xml' file associated with this session, or the
113 * name passed on the 'PolicyContainer.adoptKieSession' method.
115 public String getName()
121 * @return the 'PolicyContainer' name, followed by ':', followed by the
122 * local name of the session. It should be useful in log messages.
124 public String getFullName()
126 return(container.getName() + ":" + name);
130 * If no 'ThreadModel' is currently running, this method will create one,
131 * and invoke it's 'start()' method. Features implementing
132 * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
133 * the ThreadModel instance.
135 public synchronized void startThread()
137 if (threadModel == null)
139 // loop through all of the features, and give each one
140 // a chance to create the 'ThreadModel'
141 for (PolicySessionFeatureAPI feature :
142 PolicySessionFeatureAPI.impl.getList())
146 if ((threadModel = feature.selectThreadModel(this)) != null)
151 logger.error("ERROR: Feature API: "
152 + feature.getClass().getName(), e);
155 if (threadModel == null)
157 // no feature created a ThreadModel -- select the default
158 threadModel = new DefaultThreadModel(this);
160 logger.info("starting ThreadModel for session " + getFullName());
166 * If a 'ThreadModel' is currently running, this calls the 'stop()' method,
167 * and sets the 'threadModel' reference to 'null'.
169 public synchronized void stopThread()
171 if (threadModel != null)
179 * Notification that 'updateToVersion' was called on the container
183 if (threadModel != null)
185 // notify the 'ThreadModel', which may change one or more Thread names
186 threadModel.updated();
191 * Set this 'PolicySession' instance as the one associated with the
192 * currently-running thread.
194 public void setPolicySession()
196 // this sets a 'ThreadLocal' variable
197 policySession.set(this);
201 * @return the 'PolicySession' instance associated with the current thread
202 * (Note that this only works if the current thread is the one running
203 * 'kieSession.fireUntilHalt()'.)
205 public static PolicySession getCurrentSession()
207 return(policySession.get());
211 * Fetch the adjunct object associated with a given feature
213 * @param object this is typically the singleton feature object that is
214 * used as a key, but it might also be useful to use nested objects
215 * within the feature as keys.
216 * @return a feature-specific object associated with the key, or 'null'
217 * if it is not found.
219 public Object getAdjunct(Object object)
221 return(adjuncts.get(object));
225 * Store the adjunct object associated with a given feature
227 * @param object this is typically the singleton feature object that is
228 * used as a key, but it might also be useful to use nested objects
229 * within the feature as keys.
230 * @param value a feature-specific object associated with the key, or 'null'
231 * if the feature-specific object should be removed
233 public void setAdjunct(Object object, Object value)
237 adjuncts.remove(object);
241 adjuncts.put(object, value);
245 /***********************************/
246 /* 'AgendaEventListener' interface */
247 /***********************************/
253 public void afterMatchFired(AfterMatchFiredEvent event)
255 if (logger.isDebugEnabled())
257 logger.debug("afterMatchFired: " + getFullName()
258 + ": AgendaEventListener.afterMatchFired(" + event + ")");
260 PdpJmx.getInstance().ruleFired();
267 public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event)
269 if (logger.isDebugEnabled())
271 logger.debug("afterRuleFlowGroupActivated: " + getFullName()
272 + ": AgendaEventListener.afterRuleFlowGroupActivated("
281 public void afterRuleFlowGroupDeactivated
282 (RuleFlowGroupDeactivatedEvent event)
284 if (logger.isDebugEnabled())
286 logger.debug("afterRuleFlowGroupDeactivated: " + getFullName()
287 + ": AgendaEventListener.afterRuleFlowGroupDeactivated("
296 public void agendaGroupPopped(AgendaGroupPoppedEvent event)
298 if (logger.isDebugEnabled())
300 logger.debug("agendaGroupPopped: " + getFullName()
301 + ": AgendaEventListener.agendaGroupPopped("
310 public void agendaGroupPushed(AgendaGroupPushedEvent event)
312 if (logger.isDebugEnabled())
314 logger.debug("agendaGroupPushed: " + getFullName()
315 + ": AgendaEventListener.agendaGroupPushed("
324 public void beforeMatchFired(BeforeMatchFiredEvent event)
326 if (logger.isDebugEnabled())
328 logger.debug("beforeMatchFired: " + getFullName()
329 + ": AgendaEventListener.beforeMatchFired("
338 public void beforeRuleFlowGroupActivated
339 (RuleFlowGroupActivatedEvent event)
341 if (logger.isDebugEnabled())
343 logger.debug("beforeRuleFlowGroupActivated: " + getFullName()
344 + ": AgendaEventListener.beforeRuleFlowGroupActivated("
353 public void beforeRuleFlowGroupDeactivated
354 (RuleFlowGroupDeactivatedEvent event)
356 if (logger.isDebugEnabled())
358 logger.debug("beforeRuleFlowGroupDeactivated: " + getFullName()
359 + ": AgendaEventListener.beforeRuleFlowGroupDeactivated("
368 public void matchCancelled(MatchCancelledEvent event)
370 if (logger.isDebugEnabled())
372 logger.debug("matchCancelled: " + getFullName()
373 + ": AgendaEventListener.matchCancelled(" + event + ")");
381 public void matchCreated(MatchCreatedEvent event)
383 if (logger.isDebugEnabled())
385 logger.debug("matchCreated: " + getFullName()
386 + ": AgendaEventListener.matchCreated(" + event + ")");
390 /****************************************/
391 /* 'RuleRuntimeEventListener' interface */
392 /****************************************/
398 public void objectDeleted(ObjectDeletedEvent event)
400 if (logger.isDebugEnabled())
402 logger.debug("objectDeleted: " + getFullName()
403 + ": AgendaEventListener.objectDeleted(" + event + ")");
411 public void objectInserted(ObjectInsertedEvent event)
413 if (logger.isDebugEnabled())
415 logger.debug("objectInserted: " + getFullName()
416 + ": AgendaEventListener.objectInserted(" + event + ")");
424 public void objectUpdated(ObjectUpdatedEvent event)
426 if (logger.isDebugEnabled())
428 logger.debug("objectUpdated: " + getFullName()
429 + ": AgendaEventListener.objectUpdated(" + event + ")");
433 /* ============================================================ */
436 * This interface helps support the ability for features to choose the
437 * thread or threads that processes the 'KieSession'.
439 public interface ThreadModel
442 * Start the thread or threads that do the 'KieSession' processing
447 * Stop the thread or threads that do the 'KieSession' processing
452 * This method is called to notify the running session that
453 * 'KieContainer.updateToVersion(...)' has been called (meaning the
454 * full name of this session has changed).
456 default public void updated() {}
459 /* ============================================================ */
462 * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
464 public static class DefaultThreadModel implements Runnable,ThreadModel
466 // session associated with this persistent thread
467 PolicySession session;
469 // the session thread
472 // controls whether the thread loops or terminates
473 volatile boolean repeat = true;
476 * Constructor - initialize 'session' and create thread
478 * @param session the 'PolicySession' instance
480 public DefaultThreadModel(PolicySession session)
482 this.session = session;
483 thread = new Thread(this,getThreadName());
487 * @return the String to use as the thread name
489 private String getThreadName()
491 return("Session " + session.getFullName());
494 /***************************/
495 /* 'ThreadModel' interface */
496 /***************************/
516 // this should cause the thread to exit
517 session.getKieSession().halt();
520 // wait up to 10 seconds for the thread to stop
523 // one more interrupt, just in case the 'kieSession.halt()'
524 // didn't work for some reason
529 logger.error("stopThread in thread.join error");
537 public void updated()
539 // the container artifact has been updated -- adjust the thread name
540 thread.setName(getThreadName());
543 /************************/
544 /* 'Runnable' interface */
545 /************************/
553 // set thread local variable
554 session.setPolicySession();
556 // We want to continue looping, despite any exceptions that occur
557 // while rules are fired.
558 KieSession kieSession = session.getKieSession();
563 kieSession.fireUntilHalt();
565 // if we fall through, it means 'KieSession.halt()' was called,
566 // but this may be a result of 'KieScanner' doing an update
570 logger.error("startThread error in kieSession.fireUntilHalt", e);
573 logger.info("fireUntilHalt() returned");