2 * ============LICENSE_START=======================================================
3 * feature-session-persistence
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.persistence;
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * If this feature is supported, there is a single instance of it. It adds
58 * persistence to Drools sessions. In addition, if an active-standby feature
59 * exists, then that is used to determine the active and last-active PDP. If it
60 * does not exist, then the current host name is used as the PDP id.
62 * The bulk of the code here was once in other classes, such as
63 * 'PolicyContainer' and 'Main'. It was moved here as part of making this a
64 * separate optional feature.
66 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
68 private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
71 * Standard factory used to get various items.
73 private static Factory stdFactory = new Factory();
76 * Factory used to get various items.
78 private Factory fact = stdFactory;
83 private KieServices kieSvcFact;
86 * Persistence properties.
88 private Properties persistProps;
91 * Whether or not the SessionInfo records should be cleaned out.
93 private boolean sessInfoCleaned;
96 * SessionInfo timeout, in milli-seconds, as read from
97 * {@link #persistProps}.
99 private long sessionInfoTimeoutMs;
102 * Object used to serialize cleanup of sessioninfo table.
104 private Object cleanupLock = new Object();
107 * Sets the factory to be used during junit testing.
112 protected void setFactory(Factory fact) {
117 * Lookup the adjunct for this feature that is associated with the specified
118 * PolicyContainer. If not found, create one.
120 * @param policyContainer
121 * the container whose adjunct we are looking up, and possibly
123 * @return the associated 'ContainerAdjunct' instance, which may be new
125 private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
127 Object rval = policyContainer.getAdjunct(this);
129 if (rval == null || !(rval instanceof ContainerAdjunct)) {
130 // adjunct does not exist, or has the wrong type (should never
132 rval = new ContainerAdjunct(policyContainer);
133 policyContainer.setAdjunct(this, rval);
136 return (ContainerAdjunct) rval;
143 public int getSequenceNumber() {
151 public void globalInit(String args[], String configDir) {
153 kieSvcFact = fact.getKieServices();
156 persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
158 } catch (IOException e1) {
159 logger.error("initializePersistence: ", e1);
162 sessionInfoTimeoutMs = getPersistenceTimeout();
166 * Creates a persistent KieSession, loading it from the persistent store, or
167 * creating one, if it does not exist yet.
170 public KieSession activatePolicySession(PolicyContainer policyContainer, String name, String kieBaseName) {
172 if (isPersistenceEnabled(policyContainer, name)) {
173 cleanUpSessionInfo();
175 return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
185 public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
187 PolicyContainer policyContainer = session.getPolicyContainer();
188 if (isPersistenceEnabled(policyContainer, session.getName())) {
189 return new PersistentThreadModel(session, getProperties(policyContainer));
198 public void disposeKieSession(PolicySession policySession) {
200 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
201 if (contAdj != null) {
202 contAdj.disposeKieSession(policySession.getName());
210 public void destroyKieSession(PolicySession policySession) {
212 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
213 if (contAdj != null) {
214 contAdj.destroyKieSession(policySession.getName());
222 public boolean afterStart(PolicyEngine engine) {
230 public boolean beforeStart(PolicyEngine engine) {
231 synchronized (cleanupLock) {
232 sessInfoCleaned = false;
242 public boolean beforeActivate(PolicyEngine engine) {
243 synchronized (cleanupLock) {
244 sessInfoCleaned = false;
254 public boolean afterActivate(PolicyEngine engine) {
258 /* ============================================================ */
261 * Gets the persistence timeout value for sessioninfo records.
263 * @return the timeout value, in milli-seconds, or {@code -1} if it is
264 * unspecified or invalid
266 private long getPersistenceTimeout() {
267 String timeoutString = null;
270 timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
272 if (timeoutString != null) {
273 // timeout parameter is specified
274 return Long.valueOf(timeoutString) * 1000;
277 } catch (NumberFormatException e) {
278 logger.error("Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
285 /* ============================================================ */
288 * Each instance of this class is a logical extension of a 'PolicyContainer'
289 * instance. Its reference is stored in the 'adjuncts' table within the
290 * 'PolicyContainer', and will be garbage-collected with the container.
292 protected class ContainerAdjunct {
294 * 'PolicyContainer' instance that this adjunct is extending.
296 private PolicyContainer policyContainer;
299 * Maps a KIE session name to its data source.
301 private Map<String, DsEmf> name2ds = new HashMap<>();
304 * Constructor - initialize a new 'ContainerAdjunct'
306 * @param policyContainer
307 * the 'PolicyContainer' instance this adjunct is extending
309 private ContainerAdjunct(PolicyContainer policyContainer) {
310 this.policyContainer = policyContainer;
314 * Create a new persistent KieSession. If there is already a
315 * corresponding entry in the database, it is used to initialize the
316 * KieSession. If not, a completely new session is created.
319 * the name of the KieSession (which is also the name of the
320 * associated PolicySession)
322 * the name of the 'KieBase' instance containing this session
323 * @return a new KieSession with persistence enabled
325 private KieSession newPersistentKieSession(String name, String kieBaseName) {
329 BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
330 DsEmf dsemf = new DsEmf(ds);
333 EntityManagerFactory emf = dsemf.emf;
334 DroolsSessionConnector conn = fact.makeJpaConnector(emf);
336 long desiredSessionId = getSessionId(conn, name);
338 logger.info("\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
340 // session does not exist -- attempt to create one
341 logger.info("getPolicySession:session does not exist -- attempt to create one with name {}", name);
343 Environment env = kieSvcFact.newEnvironment();
345 configureKieEnv(env, emf);
347 KieSessionConfiguration kConf = kieSvcFact.newKieSessionConfiguration();
349 KieSession kieSession = (desiredSessionId >= 0
350 ? loadKieSession(kieBaseName, desiredSessionId, env, kConf) : null);
352 if (kieSession == null) {
353 // loadKieSession() returned null or desiredSessionId < 0
354 logger.info("LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
356 kieSession = newKieSession(kieBaseName, env);
359 replaceSession(conn, name, kieSession);
361 name2ds.put(name, dsemf);
365 } catch (RuntimeException e) {
372 * Loads an existing KieSession from the persistent store.
375 * the name of the 'KieBase' instance containing this session
376 * @param desiredSessionId
377 * id of the desired KieSession
379 * Kie Environment for the session
381 * Kie Configuration for the session
382 * @return the persistent session, or {@code null} if it could not be
385 private KieSession loadKieSession(String kieBaseName, long desiredSessionId, Environment env,
386 KieSessionConfiguration kConf) {
388 KieSession kieSession = kieSvcFact.getStoreServices().loadKieSession(desiredSessionId,
389 policyContainer.getKieContainer().getKieBase(kieBaseName), kConf, env);
391 logger.info("LOADING Loaded session {}", desiredSessionId);
395 } catch (Exception e) {
396 logger.error("loadKieSession error: ", e);
402 * Creates a new, persistent KieSession.
405 * the name of the 'KieBase' instance containing this session
407 * Kie Environment for the session
408 * @return a new, persistent session
410 private KieSession newKieSession(String kieBaseName, Environment env) {
411 KieSession kieSession = kieSvcFact.getStoreServices()
412 .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
414 logger.info("LOADING CREATED {}", kieSession.getIdentifier());
420 * Closes the data source associated with a session.
423 * name of the session being destroyed
425 private void destroyKieSession(String name) {
426 closeDataSource(name);
430 * Closes the data source associated with a session.
433 * name of the session being disposed of
435 private void disposeKieSession(String name) {
436 closeDataSource(name);
440 * Closes the data source associated with a session.
443 * name of the session whose data source is to be closed
445 private void closeDataSource(String name) {
446 DsEmf ds = name2ds.remove(name);
453 /* ============================================================ */
456 * Configures java system properties for JPA/JTA.
458 private void configureSysProps() {
459 System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
460 System.setProperty("com.arjuna.ats.arjuna.objectstore.objectStoreDir",
461 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
462 System.setProperty("ObjectStoreEnvironmentBean.objectStoreDir",
463 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
467 * Gets the data source properties.
469 * @return the data source properties
471 private Properties getDataSourceProperties() {
472 Properties props = new Properties();
473 props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
474 props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
475 props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
476 props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
477 props.put("maxActive", "3");
478 props.put("maxIdle", "1");
479 props.put("maxWait", "120000");
480 props.put("whenExhaustedAction", "2");
481 props.put("testOnBorrow", "false");
482 props.put("poolPreparedStatements", "true");
488 * Configures a Kie Environment
491 * environment to be configured
493 * entity manager factory
495 private void configureKieEnv(Environment env, EntityManagerFactory emf) {
496 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
497 env.set(EnvironmentName.TRANSACTION, fact.getUserTrans());
498 env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg());
499 env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
503 * Removes "old" Drools 'sessioninfo' records, so they aren't used to
504 * restore data to Drools sessions. This also has the useful side-effect of
505 * removing abandoned records as well.
507 private void cleanUpSessionInfo() {
509 synchronized (cleanupLock) {
511 if (sessInfoCleaned) {
512 logger.info("Clean up of sessioninfo table: already done");
516 if (sessionInfoTimeoutMs < 0) {
517 logger.info("Clean up of sessioninfo table: no timeout specified");
521 // now do the record deletion
522 try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
523 Connection connection = ds.getConnection();
524 PreparedStatement statement = connection.prepareStatement(
525 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
527 connection.setAutoCommit(true);
529 statement.setLong(1, sessionInfoTimeoutMs / 1000);
531 int count = statement.executeUpdate();
532 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
534 } catch (SQLException e) {
535 logger.error("Clean up of sessioninfo table failed", e);
538 // TODO: delete DroolsSessionEntity where sessionId not in
541 sessInfoCleaned = true;
546 * Gets a session's ID from the persistent store.
549 * persistence connector
551 * name of the session
552 * @return the session's id, or {@code -1} if the session is not found
554 private long getSessionId(DroolsSessionConnector conn, String sessnm) {
555 DroolsSession sess = conn.get(sessnm);
556 return sess != null ? sess.getSessionId() : -1;
560 * Replaces a session within the persistent store, if it exists. Adds it
564 * persistence connector
566 * name of session to be updated
568 * new session information
570 private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
572 DroolsSessionEntity sess = new DroolsSessionEntity();
574 sess.setSessionName(sessnm);
575 sess.setSessionId(kieSession.getIdentifier());
581 * Determine whether persistence is enabled for a specific container
584 * container to be checked
586 * name of the session to be checked
587 * @return {@code true} if persistence is enabled for this container, and
588 * {@code false} if not
590 private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
591 Properties properties = getProperties(container);
592 boolean rval = false;
594 if (properties != null) {
595 // fetch the 'type' property
596 String type = getProperty(properties, sessionName, "type");
597 rval = "auto".equals(type) || "native".equals(type);
604 * Determine the controller properties associated with the policy container.
607 * container whose properties are to be retrieved
608 * @return the container's properties, or {@code null} if not found
610 private Properties getProperties(PolicyContainer container) {
612 return fact.getPolicyController(container).getProperties();
613 } catch (IllegalArgumentException e) {
614 logger.error("getProperties exception: ", e);
620 * Fetch the persistence property associated with a session. The name may
623 * <li>persistence.SESSION-NAME.PROPERTY</li>
624 * <li>persistence.PROPERTY</li>
628 * properties from which the value is to be retrieved
630 * session name of interest
632 * property name of interest
633 * @return the property value, or {@code null} if not found
635 private String getProperty(Properties properties, String sessionName, String property) {
636 String value = properties.getProperty("persistence." + sessionName + "." + property);
638 value = properties.getProperty("persistence." + property);
644 /* ============================================================ */
647 * This 'ThreadModel' variant periodically calls
648 * 'KieSession.fireAllRules()', because the 'fireUntilHalt' method isn't
649 * compatible with persistence.
651 public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
654 * Session associated with this persistent thread.
656 private final PolicySession session;
659 * The session thread.
661 private final Thread thread;
664 * Used to indicate that processing should stop.
666 private final CountDownLatch stopped = new CountDownLatch(1);
669 * Minimum time, in milli-seconds, that the thread should sleep before
670 * firing rules again.
672 long minSleepTime = 100;
675 * Maximum time, in milli-seconds, that the thread should sleep before
676 * firing rules again. This is a "half" time, so that we can multiply it
677 * by two without overflowing the word size.
679 long halfMaxSleepTime = 5000L / 2L;
682 * Constructor - initialize variables and create thread
685 * the 'PolicySession' instance
687 * may contain additional session properties
689 public PersistentThreadModel(PolicySession session, Properties properties) {
690 this.session = session;
691 this.thread = new Thread(this, getThreadName());
693 if (properties == null) {
697 // extract 'minSleepTime' and/or 'maxSleepTime'
698 String name = session.getName();
700 // fetch 'minSleepTime' value, and update if defined
701 String sleepTimeString = getProperty(properties, name, "minSleepTime");
702 if (sleepTimeString != null) {
704 minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
705 } catch (Exception e) {
706 logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
710 // fetch 'maxSleepTime' value, and update if defined
711 long maxSleepTime = 2 * halfMaxSleepTime;
712 sleepTimeString = getProperty(properties, name, "maxSleepTime");
713 if (sleepTimeString != null) {
715 maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
716 } catch (Exception e) {
717 logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
721 // swap values if needed
722 if (minSleepTime > maxSleepTime) {
723 logger.error("minSleepTime(" + minSleepTime + ") is greater than maxSleepTime(" + maxSleepTime
725 long tmp = minSleepTime;
726 minSleepTime = maxSleepTime;
730 halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
734 * @return the String to use as the thread name
736 private String getThreadName() {
737 return "Session " + session.getFullName() + " (persistent)";
740 /***************************/
741 /* 'ThreadModel' interface */
742 /***************************/
748 public void start() {
757 // tell the thread to stop
760 // wait up to 10 seconds for the thread to stop
764 } catch (InterruptedException e) {
765 logger.error("stopThread exception: ", e);
766 Thread.currentThread().interrupt();
769 // verify that it's done
770 if (thread.isAlive()) {
771 logger.error("stopThread: still running");
779 public void updated() {
780 // the container artifact has been updated -- adjust the thread name
781 thread.setName(getThreadName());
784 /************************/
785 /* 'Runnable' interface */
786 /************************/
793 logger.info("PersistentThreadModel running");
795 // set thread local variable
796 session.setPolicySession();
798 KieSession kieSession = session.getKieSession();
799 long sleepTime = 2 * halfMaxSleepTime;
801 // We want to continue, despite any exceptions that occur
802 // while rules are fired.
807 if (kieSession.fireAllRules() > 0) {
808 // some rules fired -- reduce poll delay
809 sleepTime = Math.max(minSleepTime, sleepTime / 2);
811 // no rules fired -- increase poll delay
812 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
815 } catch (Exception | LinkageError e) {
816 logger.error("Exception during kieSession.fireAllRules", e);
820 if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
824 } catch (InterruptedException e) {
825 logger.error("startThread exception: ", e);
826 Thread.currentThread().interrupt();
831 logger.info("PersistentThreadModel completed");
835 /* ============================================================ */
838 * DataSource-EntityManagerFactory pair.
840 private class DsEmf {
841 private BasicDataSource bds;
842 private EntityManagerFactory emf;
845 * Makes an entity manager factory for the given data source.
850 public DsEmf(BasicDataSource bds) {
852 Map<String, Object> props = new HashMap<>();
853 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
856 this.emf = fact.makeEntMgrFact(props);
858 } catch (RuntimeException e) {
865 * Closes the entity manager factory and the data source.
867 public void close() {
871 } catch (RuntimeException e) {
880 * Closes the data source only.
882 private void closeDataSource() {
886 } catch (SQLException e) {
887 throw new PersistenceFeatureException(e);
893 private static class SingletonRegistry {
894 private static final TransactionSynchronizationRegistry transreg = new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple();
898 * Factory for various items. Methods can be overridden for junit testing.
900 protected static class Factory {
903 * Gets the transaction manager.
905 * @return the transaction manager
907 public TransactionManager getTransMgr() {
908 return com.arjuna.ats.jta.TransactionManager.transactionManager();
912 * Gets the user transaction.
914 * @return the user transaction
916 public UserTransaction getUserTrans() {
917 return com.arjuna.ats.jta.UserTransaction.userTransaction();
921 * Gets the transaction synchronization registry.
923 * @return the transaction synchronization registry
925 public TransactionSynchronizationRegistry getTransSyncReg() {
926 return SingletonRegistry.transreg;
930 * Gets the KIE services.
932 * @return the KIE services
934 public KieServices getKieServices() {
935 return KieServices.Factory.get();
939 * Loads properties from a file.
942 * name of the file to load
943 * @return properties, as loaded from the file
944 * @throws IOException
945 * if an error occurs reading from the file
947 public Properties loadProperties(String filenm) throws IOException {
948 return PropertyUtil.getProperties(filenm);
952 * Makes a Data Source.
955 * data source properties
956 * @return a new data source
958 public BasicDataSource makeDataSource(Properties dsProps) {
960 return BasicDataSourceFactory.createDataSource(dsProps);
962 } catch (Exception e) {
963 throw new PersistenceFeatureException(e);
968 * Makes a new JPA connector for drools sessions.
971 * entity manager factory
972 * @return a new JPA connector for drools sessions
974 public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
975 return new JpaDroolsSessionConnector(emf);
979 * Makes a new entity manager factory.
982 * properties with which the factory should be configured
983 * @return a new entity manager factory
985 public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
986 return Persistence.createEntityManagerFactory("onapsessionsPU", props);
990 * Gets the policy controller associated with a given policy container.
993 * container whose controller is to be retrieved
994 * @return the container's controller
996 public PolicyController getPolicyController(PolicyContainer container) {
997 return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
1002 * Runtime exceptions generated by this class. Wraps exceptions generated by
1003 * delegated operations, particularly when they are not, themselves, Runtime
1006 public static class PersistenceFeatureException extends RuntimeException {
1007 private static final long serialVersionUID = 1L;
1012 * exception to be wrapped
1014 public PersistenceFeatureException(Exception e) {