2 * ============LICENSE_START=======================================================
3 * feature-session-persistence
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.persistence;
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * If this feature is supported, there is a single instance of it. It adds persistence to Drools
58 * sessions. In addition, if an active-standby feature exists, then that is used to determine the
59 * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
62 * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
63 * was moved here as part of making this a separate optional feature.
65 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
67 private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
69 /** Standard factory used to get various items. */
70 private static Factory stdFactory = new Factory();
72 /** Factory used to get various items. */
73 private Factory fact = stdFactory;
75 /** KieService factory. */
76 private KieServices kieSvcFact;
78 /** Persistence properties. */
79 private Properties persistProps;
81 /** Whether or not the SessionInfo records should be cleaned out. */
82 private boolean sessInfoCleaned;
84 /** SessionInfo timeout, in milli-seconds, as read from
85 * {@link #persistProps}. */
86 private long sessionInfoTimeoutMs;
88 /** Object used to serialize cleanup of sessioninfo table. */
89 private Object cleanupLock = new Object();
92 * Sets the factory to be used during junit testing.
94 * @param fact factory to be used
96 protected void setFactory(Factory fact) {
101 * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
102 * not found, create one.
104 * @param policyContainer the container whose adjunct we are looking up, and possibly creating
105 * @return the associated 'ContainerAdjunct' instance, which may be new
107 private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
109 Object rval = policyContainer.getAdjunct(this);
111 if (rval == null || !(rval instanceof ContainerAdjunct)) {
112 // adjunct does not exist, or has the wrong type (should never
114 rval = new ContainerAdjunct(policyContainer);
115 policyContainer.setAdjunct(this, rval);
118 return (ContainerAdjunct) rval;
124 public int getSequenceNumber() {
131 public void globalInit(String[] args, String configDir) {
133 kieSvcFact = fact.getKieServices();
136 persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
138 } catch (IOException e1) {
139 logger.error("initializePersistence: ", e1);
142 sessionInfoTimeoutMs = getPersistenceTimeout();
146 * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
147 * does not exist yet.
150 public KieSession activatePolicySession(
151 PolicyContainer policyContainer, String name, String kieBaseName) {
153 if (isPersistenceEnabled(policyContainer, name)) {
154 cleanUpSessionInfo();
156 return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
165 public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
167 PolicyContainer policyContainer = session.getPolicyContainer();
168 if (isPersistenceEnabled(policyContainer, session.getName())) {
169 return new PersistentThreadModel(session, getProperties(policyContainer));
177 public void disposeKieSession(PolicySession policySession) {
179 ContainerAdjunct contAdj =
180 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
181 if (contAdj != null) {
182 contAdj.disposeKieSession(policySession.getName());
189 public void destroyKieSession(PolicySession policySession) {
191 ContainerAdjunct contAdj =
192 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
193 if (contAdj != null) {
194 contAdj.destroyKieSession(policySession.getName());
201 public boolean afterStart(PolicyEngine engine) {
208 public boolean beforeStart(PolicyEngine engine) {
209 synchronized (cleanupLock) {
210 sessInfoCleaned = false;
219 public boolean beforeActivate(PolicyEngine engine) {
220 synchronized (cleanupLock) {
221 sessInfoCleaned = false;
230 public boolean afterActivate(PolicyEngine engine) {
234 /* ============================================================ */
237 * Gets the persistence timeout value for sessioninfo records.
239 * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
241 private long getPersistenceTimeout() {
242 String timeoutString = null;
245 timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
247 if (timeoutString != null) {
248 // timeout parameter is specified
249 return Long.valueOf(timeoutString) * 1000;
252 } catch (NumberFormatException e) {
254 "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
262 /* ============================================================ */
265 * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
266 * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
267 * garbage-collected with the container.
269 protected class ContainerAdjunct {
270 /** 'PolicyContainer' instance that this adjunct is extending. */
271 private PolicyContainer policyContainer;
273 /** Maps a KIE session name to its data source. */
274 private Map<String, DsEmf> name2ds = new HashMap<>();
277 * Constructor - initialize a new 'ContainerAdjunct'.
279 * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
281 private ContainerAdjunct(PolicyContainer policyContainer) {
282 this.policyContainer = policyContainer;
286 * Create a new persistent KieSession. If there is already a corresponding entry in the
287 * database, it is used to initialize the KieSession. If not, a completely new session is
290 * @param name the name of the KieSession (which is also the name of the associated
292 * @param kieBaseName the name of the 'KieBase' instance containing this session
293 * @return a new KieSession with persistence enabled
295 private KieSession newPersistentKieSession(String name, String kieBaseName) {
299 BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
300 DsEmf dsemf = new DsEmf(ds);
303 EntityManagerFactory emf = dsemf.emf;
304 DroolsSessionConnector conn = fact.makeJpaConnector(emf);
306 long desiredSessionId = getSessionId(conn, name);
309 "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
311 // session does not exist -- attempt to create one
313 "getPolicySession:session does not exist -- attempt to create one with name {}", name);
315 Environment env = kieSvcFact.newEnvironment();
317 configureKieEnv(env, emf);
319 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
321 KieSession kieSession =
322 (desiredSessionId >= 0
323 ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
326 if (kieSession == null) {
327 // loadKieSession() returned null or desiredSessionId < 0
329 "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
331 kieSession = newKieSession(kieBaseName, env);
334 replaceSession(conn, name, kieSession);
336 name2ds.put(name, dsemf);
340 } catch (RuntimeException e) {
347 * Loads an existing KieSession from the persistent store.
349 * @param kieBaseName the name of the 'KieBase' instance containing this session
350 * @param desiredSessionId id of the desired KieSession
351 * @param env Kie Environment for the session
352 * @param kConf Kie Configuration for the session
353 * @return the persistent session, or {@code null} if it could not be loaded
355 private KieSession loadKieSession(
356 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
358 KieSession kieSession =
363 policyContainer.getKieContainer().getKieBase(kieBaseName),
367 logger.info("LOADING Loaded session {}", desiredSessionId);
371 } catch (Exception e) {
372 logger.error("loadKieSession error: ", e);
378 * Creates a new, persistent KieSession.
380 * @param kieBaseName the name of the 'KieBase' instance containing this session
381 * @param env Kie Environment for the session
382 * @return a new, persistent session
384 private KieSession newKieSession(String kieBaseName, Environment env) {
385 KieSession kieSession =
388 .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
390 logger.info("LOADING CREATED {}", kieSession.getIdentifier());
396 * Closes the data source associated with a session.
398 * @param name name of the session being destroyed
400 private void destroyKieSession(String name) {
401 closeDataSource(name);
405 * Closes the data source associated with a session.
407 * @param name name of the session being disposed of
409 private void disposeKieSession(String name) {
410 closeDataSource(name);
414 * Closes the data source associated with a session.
416 * @param name name of the session whose data source is to be closed
418 private void closeDataSource(String name) {
419 DsEmf ds = name2ds.remove(name);
425 /** Configures java system properties for JPA/JTA. */
426 private void configureSysProps() {
427 System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
429 "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
430 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
432 "ObjectStoreEnvironmentBean.objectStoreDir",
433 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
437 * Configures a Kie Environment.
439 * @param env environment to be configured
440 * @param emf entity manager factory
442 private void configureKieEnv(Environment env, EntityManagerFactory emf) {
443 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
444 env.set(EnvironmentName.TRANSACTION, fact.getUserTrans());
445 env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg());
446 env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
450 * Gets a session's ID from the persistent store.
452 * @param conn persistence connector
453 * @param sessnm name of the session
454 * @return the session's id, or {@code -1} if the session is not found
456 private long getSessionId(DroolsSessionConnector conn, String sessnm) {
457 DroolsSession sess = conn.get(sessnm);
458 return sess != null ? sess.getSessionId() : -1;
462 * Replaces a session within the persistent store, if it exists. Adds it otherwise.
464 * @param conn persistence connector
465 * @param sessnm name of session to be updated
466 * @param kieSession new session information
468 private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
470 DroolsSessionEntity sess = new DroolsSessionEntity();
472 sess.setSessionName(sessnm);
473 sess.setSessionId(kieSession.getIdentifier());
479 /* ============================================================ */
482 * Gets the data source properties.
484 * @return the data source properties
486 private Properties getDataSourceProperties() {
487 Properties props = new Properties();
488 props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
489 props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
490 props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
491 props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
492 props.put("maxActive", "3");
493 props.put("maxIdle", "1");
494 props.put("maxWait", "120000");
495 props.put("whenExhaustedAction", "2");
496 props.put("testOnBorrow", "false");
497 props.put("poolPreparedStatements", "true");
503 * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
504 * sessions. This also has the useful side-effect of removing abandoned records as well.
506 private void cleanUpSessionInfo() {
508 synchronized (cleanupLock) {
509 if (sessInfoCleaned) {
510 logger.info("Clean up of sessioninfo table: already done");
514 if (sessionInfoTimeoutMs < 0) {
515 logger.info("Clean up of sessioninfo table: no timeout specified");
519 // now do the record deletion
520 try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
521 Connection connection = ds.getConnection();
522 PreparedStatement statement =
523 connection.prepareStatement(
524 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
526 connection.setAutoCommit(true);
528 statement.setLong(1, sessionInfoTimeoutMs / 1000);
530 int count = statement.executeUpdate();
531 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
533 } catch (SQLException e) {
534 logger.error("Clean up of sessioninfo table failed", e);
537 // TODO: delete DroolsSessionEntity where sessionId not in
540 sessInfoCleaned = true;
545 * Determine whether persistence is enabled for a specific container.
547 * @param container container to be checked
548 * @param sessionName name of the session to be checked
549 * @return {@code true} if persistence is enabled for this container, and {@code false} if not
551 private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
552 Properties properties = getProperties(container);
553 boolean rval = false;
555 if (properties != null) {
556 // fetch the 'type' property
557 String type = getProperty(properties, sessionName, "type");
558 rval = "auto".equals(type) || "native".equals(type);
565 * Determine the controller properties associated with the policy container.
567 * @param container container whose properties are to be retrieved
568 * @return the container's properties, or {@code null} if not found
570 private Properties getProperties(PolicyContainer container) {
572 return fact.getPolicyController(container).getProperties();
573 } catch (IllegalArgumentException e) {
574 logger.error("getProperties exception: ", e);
580 * Fetch the persistence property associated with a session. The name may have the form:
583 * <li>persistence.SESSION-NAME.PROPERTY
584 * <li>persistence.PROPERTY
587 * @param properties properties from which the value is to be retrieved
588 * @param sessionName session name of interest
589 * @param property property name of interest
590 * @return the property value, or {@code null} if not found
592 private String getProperty(Properties properties, String sessionName, String property) {
593 String value = properties.getProperty("persistence." + sessionName + "." + property);
595 value = properties.getProperty("persistence." + property);
601 /* ============================================================ */
604 * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
605 * 'fireUntilHalt' method isn't compatible with persistence.
607 public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
609 /** Session associated with this persistent thread. */
610 private final PolicySession session;
612 /** The session thread. */
613 private final Thread thread;
615 /** Used to indicate that processing should stop. */
616 private final CountDownLatch stopped = new CountDownLatch(1);
618 /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
619 long minSleepTime = 100;
622 * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
623 * is a "half" time, so that we can multiply it by two without overflowing the word size.
625 long halfMaxSleepTime = 5000L / 2L;
628 * Constructor - initialize variables and create thread.
630 * @param session the 'PolicySession' instance
631 * @param properties may contain additional session properties
633 public PersistentThreadModel(PolicySession session, Properties properties) {
634 this.session = session;
635 this.thread = new Thread(this, getThreadName());
637 if (properties == null) {
641 // extract 'minSleepTime' and/or 'maxSleepTime'
642 String name = session.getName();
644 // fetch 'minSleepTime' value, and update if defined
645 String sleepTimeString = getProperty(properties, name, "minSleepTime");
646 if (sleepTimeString != null) {
648 minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
649 } catch (Exception e) {
650 logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
654 // fetch 'maxSleepTime' value, and update if defined
655 long maxSleepTime = 2 * halfMaxSleepTime;
656 sleepTimeString = getProperty(properties, name, "maxSleepTime");
657 if (sleepTimeString != null) {
659 maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
660 } catch (Exception e) {
661 logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
665 // swap values if needed
666 if (minSleepTime > maxSleepTime) {
670 + ") is greater than maxSleepTime("
673 long tmp = minSleepTime;
674 minSleepTime = maxSleepTime;
678 halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
684 * @return the String to use as the thread name */
685 private String getThreadName() {
686 return "Session " + session.getFullName() + " (persistent)";
689 /** ************************ */
690 /* 'ThreadModel' interface */
691 /** ************************ */
696 public void start() {
704 // tell the thread to stop
707 // wait up to 10 seconds for the thread to stop
711 } catch (InterruptedException e) {
712 logger.error("stopThread exception: ", e);
713 Thread.currentThread().interrupt();
716 // verify that it's done
717 if (thread.isAlive()) {
718 logger.error("stopThread: still running");
725 public void updated() {
726 // the container artifact has been updated -- adjust the thread name
727 thread.setName(getThreadName());
730 /** ********************* */
731 /* 'Runnable' interface */
732 /** ********************* */
738 logger.info("PersistentThreadModel running");
740 // set thread local variable
741 session.setPolicySession();
743 KieSession kieSession = session.getKieSession();
744 long sleepTime = 2 * halfMaxSleepTime;
746 // We want to continue, despite any exceptions that occur
747 // while rules are fired.
753 if (kieSession.fireAllRules() > 0) {
754 // some rules fired -- reduce poll delay
755 sleepTime = Math.max(minSleepTime, sleepTime / 2);
757 // no rules fired -- increase poll delay
758 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
761 } catch (Exception | LinkageError e) {
762 logger.error("Exception during kieSession.fireAllRules", e);
766 if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
770 } catch (InterruptedException e) {
771 logger.error("startThread exception: ", e);
772 Thread.currentThread().interrupt();
777 logger.info("PersistentThreadModel completed");
781 /* ============================================================ */
783 /** DataSource-EntityManagerFactory pair. */
784 private class DsEmf {
785 private BasicDataSource bds;
786 private EntityManagerFactory emf;
789 * Makes an entity manager factory for the given data source.
791 * @param bds pooled data source
793 public DsEmf(BasicDataSource bds) {
795 Map<String, Object> props = new HashMap<>();
796 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
799 this.emf = fact.makeEntMgrFact(props);
801 } catch (RuntimeException e) {
807 /** Closes the entity manager factory and the data source. */
808 public void close() {
812 } catch (RuntimeException e) {
820 /** Closes the data source only. */
821 private void closeDataSource() {
825 } catch (SQLException e) {
826 throw new PersistenceFeatureException(e);
831 private static class SingletonRegistry {
832 private static final TransactionSynchronizationRegistry transreg =
833 new com.arjuna.ats.internal.jta.transaction.arjunacore
834 .TransactionSynchronizationRegistryImple();
836 private SingletonRegistry() {
841 /** Factory for various items. Methods can be overridden for junit testing. */
842 protected static class Factory {
845 * Gets the transaction manager.
847 * @return the transaction manager
849 public TransactionManager getTransMgr() {
850 return com.arjuna.ats.jta.TransactionManager.transactionManager();
854 * Gets the user transaction.
856 * @return the user transaction
858 public UserTransaction getUserTrans() {
859 return com.arjuna.ats.jta.UserTransaction.userTransaction();
863 * Gets the transaction synchronization registry.
865 * @return the transaction synchronization registry
867 public TransactionSynchronizationRegistry getTransSyncReg() {
868 return SingletonRegistry.transreg;
872 * Gets the KIE services.
874 * @return the KIE services
876 public KieServices getKieServices() {
877 return KieServices.Factory.get();
881 * Loads properties from a file.
883 * @param filenm name of the file to load
884 * @return properties, as loaded from the file
885 * @throws IOException if an error occurs reading from the file
887 public Properties loadProperties(String filenm) throws IOException {
888 return PropertyUtil.getProperties(filenm);
892 * Makes a Data Source.
894 * @param dsProps data source properties
895 * @return a new data source
897 public BasicDataSource makeDataSource(Properties dsProps) {
899 return BasicDataSourceFactory.createDataSource(dsProps);
901 } catch (Exception e) {
902 throw new PersistenceFeatureException(e);
907 * Makes a new JPA connector for drools sessions.
909 * @param emf entity manager factory
910 * @return a new JPA connector for drools sessions
912 public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
913 return new JpaDroolsSessionConnector(emf);
917 * Makes a new entity manager factory.
919 * @param props properties with which the factory should be configured
920 * @return a new entity manager factory
922 public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
923 return Persistence.createEntityManagerFactory("onapsessionsPU", props);
927 * Gets the policy controller associated with a given policy container.
929 * @param container container whose controller is to be retrieved
930 * @return the container's controller
932 public PolicyController getPolicyController(PolicyContainer container) {
933 return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
938 * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
939 * particularly when they are not, themselves, Runtime exceptions.
941 public static class PersistenceFeatureException extends RuntimeException {
942 private static final long serialVersionUID = 1L;
947 public PersistenceFeatureException(Exception ex) {