2 * ============LICENSE_START=======================================================
3 * feature-session-persistence
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.persistence;
23 import java.io.IOException;
24 import java.sql.PreparedStatement;
25 import java.sql.SQLException;
26 import java.util.HashMap;
28 import java.util.Properties;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import javax.persistence.EntityManagerFactory;
32 import javax.persistence.Persistence;
33 import javax.transaction.TransactionManager;
34 import javax.transaction.TransactionSynchronizationRegistry;
35 import javax.transaction.UserTransaction;
36 import org.apache.commons.dbcp2.BasicDataSource;
37 import org.apache.commons.dbcp2.BasicDataSourceFactory;
38 import org.hibernate.cfg.AvailableSettings;
39 import org.kie.api.KieServices;
40 import org.kie.api.runtime.Environment;
41 import org.kie.api.runtime.EnvironmentName;
42 import org.kie.api.runtime.KieSession;
43 import org.kie.api.runtime.KieSessionConfiguration;
44 import org.onap.policy.drools.core.PolicyContainer;
45 import org.onap.policy.drools.core.PolicySession;
46 import org.onap.policy.drools.core.PolicySessionFeatureApi;
47 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
48 import org.onap.policy.drools.system.PolicyController;
49 import org.onap.policy.drools.system.PolicyControllerConstants;
50 import org.onap.policy.drools.system.PolicyEngine;
51 import org.onap.policy.drools.utils.PropertyUtil;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * If this feature is supported, there is a single instance of it. It adds persistence to Drools
57 * sessions. In addition, if an active-standby feature exists, then that is used to determine the
58 * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
61 * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
62 * was moved here as part of making this a separate optional feature.
64 public class PersistenceFeature implements PolicySessionFeatureApi, PolicyEngineFeatureApi {
66 private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
68 /** KieService factory. */
69 private KieServices kieSvcFact;
71 /** Persistence properties. */
72 private Properties persistProps;
74 /** Whether or not the SessionInfo records should be cleaned out. */
75 private boolean sessInfoCleaned;
77 /** SessionInfo timeout, in milli-seconds, as read from
78 * {@link #persistProps}. */
79 private long sessionInfoTimeoutMs;
81 /** Object used to serialize cleanup of sessioninfo table. */
82 private Object cleanupLock = new Object();
85 * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
86 * not found, create one.
88 * @param policyContainer the container whose adjunct we are looking up, and possibly creating
89 * @return the associated 'ContainerAdjunct' instance, which may be new
91 private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
93 Object rval = policyContainer.getAdjunct(this);
95 if (!(rval instanceof ContainerAdjunct)) {
96 // adjunct does not exist, or has the wrong type (should never
98 rval = new ContainerAdjunct(policyContainer);
99 policyContainer.setAdjunct(this, rval);
102 return (ContainerAdjunct) rval;
109 public int getSequenceNumber() {
117 public void globalInit(String[] args, String configDir) {
119 kieSvcFact = getKieServices();
122 persistProps = loadProperties(configDir + "/feature-session-persistence.properties");
124 } catch (IOException e1) {
125 logger.error("initializePersistence: ", e1);
128 sessionInfoTimeoutMs = getPersistenceTimeout();
132 * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
133 * does not exist yet.
136 public KieSession activatePolicySession(
137 PolicyContainer policyContainer, String name, String kieBaseName) {
139 if (isPersistenceEnabled(policyContainer, name)) {
140 cleanUpSessionInfo();
142 return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
152 public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
154 var policyContainer = session.getContainer();
155 if (isPersistenceEnabled(policyContainer, session.getName())) {
156 return new PersistentThreadModel(session, getProperties(policyContainer));
165 public void disposeKieSession(PolicySession policySession) {
167 ContainerAdjunct contAdj =
168 (ContainerAdjunct) policySession.getContainer().getAdjunct(this);
169 if (contAdj != null) {
170 contAdj.disposeKieSession(policySession.getName());
178 public void destroyKieSession(PolicySession policySession) {
180 ContainerAdjunct contAdj =
181 (ContainerAdjunct) policySession.getContainer().getAdjunct(this);
182 if (contAdj != null) {
183 contAdj.destroyKieSession(policySession.getName());
191 public boolean afterStart(PolicyEngine engine) {
199 public boolean beforeStart(PolicyEngine engine) {
207 public boolean beforeActivate(PolicyEngine engine) {
211 private boolean cleanup() {
212 synchronized (cleanupLock) {
213 sessInfoCleaned = false;
223 public boolean afterActivate(PolicyEngine engine) {
227 /* ============================================================ */
230 * Gets the persistence timeout value for sessioninfo records.
232 * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
234 private long getPersistenceTimeout() {
235 String timeoutString = null;
238 timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
240 if (timeoutString != null) {
241 // timeout parameter is specified
242 return Long.valueOf(timeoutString) * 1000;
245 } catch (NumberFormatException e) {
247 "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
255 /* ============================================================ */
258 * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
259 * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
260 * garbage-collected with the container.
262 protected class ContainerAdjunct {
263 /** 'PolicyContainer' instance that this adjunct is extending. */
264 private PolicyContainer policyContainer;
266 /** Maps a KIE session name to its data source. */
267 private Map<String, DsEmf> name2ds = new HashMap<>();
270 * Constructor - initialize a new 'ContainerAdjunct'.
272 * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
274 private ContainerAdjunct(PolicyContainer policyContainer) {
275 this.policyContainer = policyContainer;
279 * Create a new persistent KieSession. If there is already a corresponding entry in the
280 * database, it is used to initialize the KieSession. If not, a completely new session is
283 * @param name the name of the KieSession (which is also the name of the associated
285 * @param kieBaseName the name of the 'KieBase' instance containing this session
286 * @return a new KieSession with persistence enabled
288 private KieSession newPersistentKieSession(String name, String kieBaseName) {
292 BasicDataSource ds = makeDataSource(getDataSourceProperties());
293 var dsemf = new DsEmf(ds);
296 EntityManagerFactory emf = dsemf.emf;
297 DroolsSessionConnector conn = makeJpaConnector(emf);
299 long desiredSessionId = getSessionId(conn, name);
302 "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
304 // session does not exist -- attempt to create one
306 "getPolicySession:session does not exist -- attempt to create one with name {}", name);
308 var env = kieSvcFact.newEnvironment();
310 configureKieEnv(env, emf);
312 var kieConf = kieSvcFact.newKieSessionConfiguration();
314 KieSession kieSession =
315 (desiredSessionId >= 0
316 ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
319 if (kieSession == null) {
320 // loadKieSession() returned null or desiredSessionId < 0
322 "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
324 kieSession = newKieSession(kieBaseName, env);
327 replaceSession(conn, name, kieSession);
329 name2ds.put(name, dsemf);
333 } catch (RuntimeException e) {
340 * Loads an existing KieSession from the persistent store.
342 * @param kieBaseName the name of the 'KieBase' instance containing this session
343 * @param desiredSessionId id of the desired KieSession
344 * @param env Kie Environment for the session
345 * @param kieConf Kie Configuration for the session
346 * @return the persistent session, or {@code null} if it could not be loaded
348 private KieSession loadKieSession(
349 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
356 policyContainer.getKieContainer().getKieBase(kieBaseName),
360 logger.info("LOADING Loaded session {}", desiredSessionId);
364 } catch (Exception e) {
365 logger.error("loadKieSession error: ", e);
371 * Creates a new, persistent KieSession.
373 * @param kieBaseName the name of the 'KieBase' instance containing this session
374 * @param env Kie Environment for the session
375 * @return a new, persistent session
377 private KieSession newKieSession(String kieBaseName, Environment env) {
381 .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
383 logger.info("LOADING CREATED {}", kieSession.getIdentifier());
389 * Closes the data source associated with a session.
391 * @param name name of the session being destroyed
393 private void destroyKieSession(String name) {
394 closeDataSource(name);
398 * Closes the data source associated with a session.
400 * @param name name of the session being disposed of
402 private void disposeKieSession(String name) {
403 closeDataSource(name);
407 * Closes the data source associated with a session.
409 * @param name name of the session whose data source is to be closed
411 private void closeDataSource(String name) {
412 DsEmf ds = name2ds.remove(name);
418 /** Configures java system properties for JPA/JTA. */
419 private void configureSysProps() {
420 System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
422 "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
423 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
425 "ObjectStoreEnvironmentBean.objectStoreDir",
426 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
430 * Configures a Kie Environment.
432 * @param env environment to be configured
433 * @param emf entity manager factory
435 private void configureKieEnv(Environment env, EntityManagerFactory emf) {
436 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
437 env.set(EnvironmentName.TRANSACTION, getUserTrans());
438 env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, getTransSyncReg());
439 env.set(EnvironmentName.TRANSACTION_MANAGER, getTransMgr());
443 * Gets a session's ID from the persistent store.
445 * @param conn persistence connector
446 * @param sessnm name of the session
447 * @return the session's id, or {@code -1} if the session is not found
449 private long getSessionId(DroolsSessionConnector conn, String sessnm) {
450 DroolsSession sess = conn.get(sessnm);
451 return sess != null ? sess.getSessionId() : -1;
455 * Replaces a session within the persistent store, if it exists. Adds it otherwise.
457 * @param conn persistence connector
458 * @param sessnm name of session to be updated
459 * @param kieSession new session information
461 private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
463 var sess = new DroolsSessionEntity();
465 sess.setSessionName(sessnm);
466 sess.setSessionId(kieSession.getIdentifier());
472 /* ============================================================ */
475 * Gets the data source properties.
477 * @return the data source properties
479 private Properties getDataSourceProperties() {
480 var props = new Properties();
481 props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
482 props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
483 props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
484 props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
485 props.put("maxActive", "3");
486 props.put("maxIdle", "1");
487 props.put("maxWait", "120000");
488 props.put("whenExhaustedAction", "2");
489 props.put("testOnBorrow", "false");
490 props.put("poolPreparedStatements", "true");
496 * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
497 * sessions. This also has the useful side-effect of removing abandoned records as well.
499 private void cleanUpSessionInfo() {
501 synchronized (cleanupLock) {
502 if (sessInfoCleaned) {
503 logger.info("Clean up of sessioninfo table: already done");
507 if (sessionInfoTimeoutMs < 0) {
508 logger.info("Clean up of sessioninfo table: no timeout specified");
512 // now do the record deletion
513 try (BasicDataSource ds = makeDataSource(getDataSourceProperties());
514 var connection = ds.getConnection();
515 PreparedStatement statement =
516 connection.prepareStatement(
517 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
519 connection.setAutoCommit(true);
521 statement.setLong(1, sessionInfoTimeoutMs / 1000);
523 int count = statement.executeUpdate();
524 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
526 } catch (SQLException e) {
527 logger.error("Clean up of sessioninfo table failed", e);
530 // delete DroolsSessionEntity where sessionId not in (sessinfo.xxx)?
532 sessInfoCleaned = true;
537 * Determine whether persistence is enabled for a specific container.
539 * @param container container to be checked
540 * @param sessionName name of the session to be checked
541 * @return {@code true} if persistence is enabled for this container, and {@code false} if not
543 private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
544 var properties = getProperties(container);
547 if (properties != null) {
548 // fetch the 'type' property
549 String type = getProperty(properties, sessionName, "type");
550 rval = "auto".equals(type) || "native".equals(type);
557 * Determine the controller properties associated with the policy container.
559 * @param container container whose properties are to be retrieved
560 * @return the container's properties, or {@code null} if not found
562 private Properties getProperties(PolicyContainer container) {
564 return getPolicyController(container).getProperties();
565 } catch (IllegalArgumentException e) {
566 logger.error("getProperties exception: ", e);
572 * Fetch the persistence property associated with a session. The name may have the form:
575 * <li>persistence.SESSION-NAME.PROPERTY
576 * <li>persistence.PROPERTY
579 * @param properties properties from which the value is to be retrieved
580 * @param sessionName session name of interest
581 * @param property property name of interest
582 * @return the property value, or {@code null} if not found
584 private String getProperty(Properties properties, String sessionName, String property) {
585 String value = properties.getProperty("persistence." + sessionName + "." + property);
587 value = properties.getProperty("persistence." + property);
593 /* ============================================================ */
596 * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
597 * 'fireUntilHalt' method isn't compatible with persistence.
599 public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
601 /** Session associated with this persistent thread. */
602 private final PolicySession session;
604 /** The session thread. */
605 private final Thread thread;
607 /** Used to indicate that processing should stop. */
608 private final CountDownLatch stopped = new CountDownLatch(1);
610 /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
611 long minSleepTime = 100;
614 * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
615 * is a "half" time, so that we can multiply it by two without overflowing the word size.
617 long halfMaxSleepTime = 5000L / 2L;
620 * Constructor - initialize variables and create thread.
622 * @param session the 'PolicySession' instance
623 * @param properties may contain additional session properties
625 public PersistentThreadModel(PolicySession session, Properties properties) {
626 this.session = session;
627 this.thread = new Thread(this, getThreadName());
629 if (properties == null) {
633 // extract 'minSleepTime' and/or 'maxSleepTime'
634 String name = session.getName();
636 // fetch 'minSleepTime' value, and update if defined
637 var sleepTimeString = getProperty(properties, name, "minSleepTime");
638 if (sleepTimeString != null) {
640 minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
641 } catch (Exception e) {
642 logger.error("{}: Illegal value for 'minSleepTime'", sleepTimeString, e);
646 // fetch 'maxSleepTime' value, and update if defined
647 long maxSleepTime = 2 * halfMaxSleepTime;
648 sleepTimeString = getProperty(properties, name, "maxSleepTime");
649 if (sleepTimeString != null) {
651 maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
652 } catch (Exception e) {
653 logger.error("{}: Illegal value for 'maxSleepTime'", sleepTimeString, e);
657 // swap values if needed
658 if (minSleepTime > maxSleepTime) {
659 logger.error("minSleepTime({}) is greater than maxSleepTime({}) -- swapping", minSleepTime,
661 long tmp = minSleepTime;
662 minSleepTime = maxSleepTime;
666 halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
672 * @return the String to use as the thread name */
673 private String getThreadName() {
674 return "Session " + session.getFullName() + " (persistent)";
677 /*=========================*/
678 /* 'ThreadModel' interface */
679 /*=========================*/
685 public void start() {
694 // tell the thread to stop
697 // wait up to 10 seconds for the thread to stop
701 } catch (InterruptedException e) {
702 logger.error("stopThread exception: ", e);
703 Thread.currentThread().interrupt();
706 // verify that it's done
707 if (thread.isAlive()) {
708 logger.error("stopThread: still running");
716 public void updated() {
717 // the container artifact has been updated -- adjust the thread name
718 thread.setName(getThreadName());
721 /*======================*/
722 /* 'Runnable' interface */
723 /*======================*/
730 logger.info("PersistentThreadModel running");
732 // set thread local variable
733 session.setPolicySession();
735 var kieSession = session.getKieSession();
736 long sleepTime = 2 * halfMaxSleepTime;
738 // We want to continue, despite any exceptions that occur
739 // while rules are fired.
745 if (kieSession.fireAllRules() > 0) {
746 // some rules fired -- reduce poll delay
747 sleepTime = Math.max(minSleepTime, sleepTime / 2);
749 // no rules fired -- increase poll delay
750 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
753 } catch (Exception | LinkageError e) {
754 logger.error("Exception during kieSession.fireAllRules", e);
758 if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
762 } catch (InterruptedException e) {
763 logger.error("startThread exception: ", e);
764 Thread.currentThread().interrupt();
769 session.removePolicySession();
770 logger.info("PersistentThreadModel completed");
774 /* ============================================================ */
776 /** DataSource-EntityManagerFactory pair. */
777 private class DsEmf {
778 private BasicDataSource bds;
779 private EntityManagerFactory emf;
782 * Makes an entity manager factory for the given data source.
784 * @param bds pooled data source
786 public DsEmf(BasicDataSource bds) {
788 Map<String, Object> props = new HashMap<>();
789 props.put(AvailableSettings.JPA_JTA_DATASOURCE, bds);
792 this.emf = makeEntMgrFact(props);
794 } catch (RuntimeException e) {
800 /** Closes the entity manager factory and the data source. */
801 public void close() {
805 } catch (RuntimeException e) {
813 /** Closes the data source only. */
814 private void closeDataSource() {
818 } catch (SQLException e) {
819 throw new PersistenceFeatureException(e);
824 private static class SingletonRegistry {
825 private static final TransactionSynchronizationRegistry transreg =
826 new com.arjuna.ats.internal.jta.transaction.arjunacore
827 .TransactionSynchronizationRegistryImple();
829 private SingletonRegistry() {
834 /* Factory for various items. Methods can be overridden for junit testing. */
837 * Gets the transaction manager.
839 * @return the transaction manager
841 protected TransactionManager getTransMgr() {
842 return com.arjuna.ats.jta.TransactionManager.transactionManager();
846 * Gets the user transaction.
848 * @return the user transaction
850 protected UserTransaction getUserTrans() {
851 return com.arjuna.ats.jta.UserTransaction.userTransaction();
855 * Gets the transaction synchronization registry.
857 * @return the transaction synchronization registry
859 protected TransactionSynchronizationRegistry getTransSyncReg() {
860 return SingletonRegistry.transreg;
864 * Gets the KIE services.
866 * @return the KIE services
868 protected KieServices getKieServices() {
869 return KieServices.Factory.get();
873 * Loads properties from a file.
875 * @param filenm name of the file to load
876 * @return properties, as loaded from the file
877 * @throws IOException if an error occurs reading from the file
879 protected Properties loadProperties(String filenm) throws IOException {
880 return PropertyUtil.getProperties(filenm);
884 * Makes a Data Source.
886 * @param dsProps data source properties
887 * @return a new data source
889 protected BasicDataSource makeDataSource(Properties dsProps) {
891 return BasicDataSourceFactory.createDataSource(dsProps);
893 } catch (Exception e) {
894 throw new PersistenceFeatureException(e);
899 * Makes a new JPA connector for drools sessions.
901 * @param emf entity manager factory
902 * @return a new JPA connector for drools sessions
904 protected DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
905 return new JpaDroolsSessionConnector(emf);
909 * Makes a new entity manager factory.
911 * @param props properties with which the factory should be configured
912 * @return a new entity manager factory
914 protected EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
915 return Persistence.createEntityManagerFactory("onapsessionsPU", props);
919 * Gets the policy controller associated with a given policy container.
921 * @param container container whose controller is to be retrieved
922 * @return the container's controller
924 protected PolicyController getPolicyController(PolicyContainer container) {
925 return PolicyControllerConstants.getFactory().get(container.getGroupId(), container.getArtifactId());
929 * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
930 * particularly when they are not, themselves, Runtime exceptions.
932 public static class PersistenceFeatureException extends RuntimeException {
933 private static final long serialVersionUID = 1L;
938 public PersistenceFeatureException(Exception ex) {