2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2024 Nordix Foundation.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.drools.core;
25 import java.util.concurrent.ConcurrentHashMap;
27 import org.kie.api.event.rule.AfterMatchFiredEvent;
28 import org.kie.api.event.rule.AgendaEventListener;
29 import org.kie.api.event.rule.AgendaGroupPoppedEvent;
30 import org.kie.api.event.rule.AgendaGroupPushedEvent;
31 import org.kie.api.event.rule.BeforeMatchFiredEvent;
32 import org.kie.api.event.rule.MatchCancelledEvent;
33 import org.kie.api.event.rule.MatchCreatedEvent;
34 import org.kie.api.event.rule.ObjectDeletedEvent;
35 import org.kie.api.event.rule.ObjectInsertedEvent;
36 import org.kie.api.event.rule.ObjectUpdatedEvent;
37 import org.kie.api.event.rule.RuleFlowGroupActivatedEvent;
38 import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent;
39 import org.kie.api.event.rule.RuleRuntimeEventListener;
40 import org.kie.api.runtime.KieSession;
41 import org.onap.policy.drools.core.jmx.PdpJmx;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
47 * This class is a wrapper around 'KieSession', which adds the following:
49 * <p>1) A thread running 'KieSession.fireUntilHalt()'
51 * 3) Logging of events
53 public class PolicySession implements AgendaEventListener, RuleRuntimeEventListener {
54 // get an instance of logger
55 private static final Logger logger = LoggerFactory.getLogger(PolicySession.class);
57 // supports 'getCurrentSession()' method
58 private static ThreadLocal<PolicySession> policySess = new ThreadLocal<>();
60 // name of the 'PolicySession' and associated 'KieSession'
64 // the associated 'PolicyContainer', which may have additional
65 // 'PolicySession' instances in addition to this one
67 private PolicyContainer container;
69 // maps feature objects to per-PolicyContainer data
70 private ConcurrentHashMap<Object, Object> adjuncts =
71 new ConcurrentHashMap<>();
73 // associated 'KieSession' instance
75 private KieSession kieSession;
77 // if not 'null', this is the thread model processing the 'KieSession'
78 private ThreadModel threadModel = null;
81 * Internal constructor - create a 'PolicySession' instance.
83 * @param name the name of this 'PolicySession' (and 'kieSession')
84 * @param container the 'PolicyContainer' instance containing this session
85 * @param kieSession the associated 'KieSession' instance
87 protected PolicySession(String name,
88 PolicyContainer container, KieSession kieSession) {
90 this.container = container;
91 this.kieSession = kieSession;
92 kieSession.addEventListener((AgendaEventListener) this);
93 kieSession.addEventListener((RuleRuntimeEventListener) this);
99 * @return the 'PolicyContainer' name, followed by ':', followed by the
100 * local name of the session. It should be useful in log messages.
102 public String getFullName() {
103 return container.getName() + ":" + name;
107 * If no 'ThreadModel' is currently running, this method will create one,
108 * and invoke it's 'start()' method. Features implementing
109 * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
110 * the ThreadModel instance.
112 public synchronized void startThread() {
113 if (threadModel != null) {
117 // loop through all the features, and give each one
118 // a chance to create the 'ThreadModel'
119 for (PolicySessionFeatureApi feature :
120 PolicySessionFeatureApiConstants.getImpl().getList()) {
122 if ((threadModel = feature.selectThreadModel(this)) != null) {
125 } catch (Exception e) {
126 logger.error("ERROR: Feature API: {}", feature.getClass().getName(), e);
129 if (threadModel == null) {
130 // no feature created a ThreadModel -- select the default
131 threadModel = new DefaultThreadModel(this);
133 logger.info("starting ThreadModel for session {}", getFullName());
138 * If a 'ThreadModel' is currently running, this calls the 'stop()' method,
139 * and sets the 'threadModel' reference to 'null'.
141 public synchronized void stopThread() {
142 if (threadModel != null) {
149 * Notification that 'updateToVersion' was called on the container.
152 if (threadModel != null) {
153 // notify the 'ThreadModel', which may change one or more Thread names
154 threadModel.updated();
159 * Set this 'PolicySession' instance as the one associated with the
160 * currently-running thread.
162 public void setPolicySession() {
163 // this sets a 'ThreadLocal' variable
164 policySess.set(this);
168 * Unset this 'PolicySession' instance as the one associated with the
169 * currently-running thread.
171 public void removePolicySession() {
172 if (policySess.get() == this) {
178 * Get current session.
180 * @return the 'PolicySession' instance associated with the current thread
181 * (Note that this only works if the current thread is the one running
182 * 'kieSession.fireUntilHalt()'.)
184 public static PolicySession getCurrentSession() {
185 return policySess.get();
189 * Fetch the adjunct object associated with a given feature.
191 * @param object this is typically the singleton feature object that is
192 * used as a key, but it might also be useful to use nested objects
193 * within the feature as keys.
194 * @return a feature-specific object associated with the key, or 'null'
195 * if it is not found.
197 public Object getAdjunct(Object object) {
198 return adjuncts.get(object);
202 * Store the adjunct object associated with a given feature.
204 * @param object this is typically the singleton feature object that is
205 * used as a key, but it might also be useful to use nested objects
206 * within the feature as keys.
207 * @param value a feature-specific object associated with the key, or 'null'
208 * if the feature-specific object should be removed
210 public void setAdjunct(Object object, Object value) {
212 adjuncts.remove(object);
214 adjuncts.put(object, value);
219 * This method will insert an object into the Drools memory associated
220 * with this 'PolicySession' instance. Features are given the opportunity
221 * to handle the insert, and a distributed host feature could use this to
222 * send the object to another host, and insert it in the corresponding
225 * @param object the object to insert in Drools memory
227 public void insertDrools(Object object) {
228 for (PolicySessionFeatureApi feature :
229 PolicySessionFeatureApiConstants.getImpl().getList()) {
230 if (feature.insertDrools(this, object)) {
231 // feature is performing the insert
235 // no feature has intervened -- do the insert locally
236 if (kieSession != null) {
237 kieSession.insert(object);
241 /*=================================*/
242 /* 'AgendaEventListener' interface */
243 /*=================================*/
249 public void afterMatchFired(AfterMatchFiredEvent event) {
250 logger.debug("afterMatchFired: {}: AgendaEventListener.afterMatchFired({})", getFullName(), event);
251 PdpJmx.getInstance().ruleFired();
258 public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) {
259 logger.debug("afterRuleFlowGroupActivated: {}: AgendaEventListener.afterRuleFlowGroupActivated({})",
260 getFullName(), event);
267 public void afterRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event) {
268 logger.debug("afterRuleFlowGroupDeactivated: {}: AgendaEventListener.afterRuleFlowGroupDeactivated({})",
269 getFullName(), event);
276 public void agendaGroupPopped(AgendaGroupPoppedEvent event) {
277 logger.debug("agendaGroupPopped: {}: AgendaEventListener.agendaGroupPopped({})", getFullName(), event);
284 public void agendaGroupPushed(AgendaGroupPushedEvent event) {
285 logger.debug("agendaGroupPushed: {}: AgendaEventListener.agendaGroupPushed({})", getFullName(), event);
292 public void beforeMatchFired(BeforeMatchFiredEvent event) {
293 logger.debug("beforeMatchFired: {}: AgendaEventListener.beforeMatchFired({})", getFullName(), event);
300 public void beforeRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) {
301 logger.debug("beforeRuleFlowGroupActivated: {}: AgendaEventListener.beforeRuleFlowGroupActivated({})",
302 getFullName(), event);
309 public void beforeRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event) {
310 logger.debug("beforeRuleFlowGroupDeactivated: {}: AgendaEventListener.beforeRuleFlowGroupDeactivated({})",
311 getFullName(), event);
318 public void matchCancelled(MatchCancelledEvent event) {
319 logger.debug("matchCancelled: {}: AgendaEventListener.matchCancelled({})", getFullName(), event);
326 public void matchCreated(MatchCreatedEvent event) {
327 logger.debug("matchCreated: {}: AgendaEventListener.matchCreated({})", getFullName(), event);
330 /* ====================================== */
331 /* 'RuleRuntimeEventListener' interface */
332 /* ====================================== */
338 public void objectDeleted(ObjectDeletedEvent event) {
339 logger.debug("objectDeleted: {}: AgendaEventListener.objectDeleted({})", getFullName(), event);
346 public void objectInserted(ObjectInsertedEvent event) {
347 logger.debug("objectInserted: {}: AgendaEventListener.objectInserted({})", getFullName(), event);
354 public void objectUpdated(ObjectUpdatedEvent event) {
355 logger.debug("objectUpdated: {}: AgendaEventListener.objectUpdated({})", getFullName(), event);
358 /* ============================================================ */
361 * This interface helps support the ability for features to choose the
362 * thread or threads that processes the 'KieSession'.
364 public interface ThreadModel {
366 * Start the thread or threads that do the 'KieSession' processing.
371 * Stop the thread or threads that do the 'KieSession' processing.
376 * This method is called to notify the running session that
377 * 'KieContainer.updateToVersion(...)' has been called (meaning the
378 * full name of this session has changed).
380 default void updated() {
384 /* ============================================================ */
387 * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
389 public static class DefaultThreadModel implements Runnable, ThreadModel {
390 // session associated with this persistent thread
391 PolicySession session;
393 // the session thread
396 // controls whether the thread loops or terminates
397 volatile boolean repeat = true;
400 * Constructor - initialize 'session' and create thread.
402 * @param session the 'PolicySession' instance
404 public DefaultThreadModel(PolicySession session) {
405 this.session = session;
406 thread = new Thread(this, getThreadName());
412 * @return the String to use as the thread name
414 private String getThreadName() {
415 return "Session " + session.getFullName();
418 /*=========================*/
419 /* 'ThreadModel' interface */
420 /*=========================*/
426 public void start() {
438 // this should cause the thread to exit
439 session.getKieSession().halt();
441 // wait up to 10 seconds for the thread to stop
444 // one more interrupt, just in case the 'kieSession.halt()'
445 // didn't work for some reason
447 } catch (InterruptedException e) {
448 logger.error("stopThread in thread.join error", e);
449 Thread.currentThread().interrupt();
457 public void updated() {
458 // the container artifact has been updated -- adjust the thread name
459 thread.setName(getThreadName());
462 /*======================*/
463 /* 'Runnable' interface */
464 /*======================*/
471 // set thread local variable
472 session.setPolicySession();
474 // We want to continue looping, despite any exceptions that occur
475 // while rules are fired.
476 var kieSession1 = session.getKieSession();
479 kieSession1.fireUntilHalt();
481 // if we fall through, it means 'kieSession1.halt()' was called,
482 // but this may be a result of 'KieScanner' doing an update
483 } catch (Exception | LinkageError e) {
484 logger.error("startThread error in kieSession1.fireUntilHalt", e);
488 session.removePolicySession();
489 logger.info("fireUntilHalt() returned");