2 * ============LICENSE_START=======================================================
3 * feature-session-persistence
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.persistence;
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * If this feature is supported, there is a single instance of it. It adds persistence to Drools
58 * sessions. In addition, if an active-standby feature exists, then that is used to determine the
59 * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
62 * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
63 * was moved here as part of making this a separate optional feature.
65 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureApi {
67 private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
69 /** KieService factory. */
70 private KieServices kieSvcFact;
72 /** Persistence properties. */
73 private Properties persistProps;
75 /** Whether or not the SessionInfo records should be cleaned out. */
76 private boolean sessInfoCleaned;
78 /** SessionInfo timeout, in milli-seconds, as read from
79 * {@link #persistProps}. */
80 private long sessionInfoTimeoutMs;
82 /** Object used to serialize cleanup of sessioninfo table. */
83 private Object cleanupLock = new Object();
86 * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
87 * not found, create one.
89 * @param policyContainer the container whose adjunct we are looking up, and possibly creating
90 * @return the associated 'ContainerAdjunct' instance, which may be new
92 private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
94 Object rval = policyContainer.getAdjunct(this);
96 if (rval == null || !(rval instanceof ContainerAdjunct)) {
97 // adjunct does not exist, or has the wrong type (should never
99 rval = new ContainerAdjunct(policyContainer);
100 policyContainer.setAdjunct(this, rval);
103 return (ContainerAdjunct) rval;
110 public int getSequenceNumber() {
118 public void globalInit(String[] args, String configDir) {
120 kieSvcFact = getKieServices();
123 persistProps = loadProperties(configDir + "/feature-session-persistence.properties");
125 } catch (IOException e1) {
126 logger.error("initializePersistence: ", e1);
129 sessionInfoTimeoutMs = getPersistenceTimeout();
133 * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
134 * does not exist yet.
137 public KieSession activatePolicySession(
138 PolicyContainer policyContainer, String name, String kieBaseName) {
140 if (isPersistenceEnabled(policyContainer, name)) {
141 cleanUpSessionInfo();
143 return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
153 public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
155 PolicyContainer policyContainer = session.getPolicyContainer();
156 if (isPersistenceEnabled(policyContainer, session.getName())) {
157 return new PersistentThreadModel(session, getProperties(policyContainer));
166 public void disposeKieSession(PolicySession policySession) {
168 ContainerAdjunct contAdj =
169 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
170 if (contAdj != null) {
171 contAdj.disposeKieSession(policySession.getName());
179 public void destroyKieSession(PolicySession policySession) {
181 ContainerAdjunct contAdj =
182 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
183 if (contAdj != null) {
184 contAdj.destroyKieSession(policySession.getName());
192 public boolean afterStart(PolicyEngine engine) {
200 public boolean beforeStart(PolicyEngine engine) {
201 synchronized (cleanupLock) {
202 sessInfoCleaned = false;
212 public boolean beforeActivate(PolicyEngine engine) {
213 synchronized (cleanupLock) {
214 sessInfoCleaned = false;
224 public boolean afterActivate(PolicyEngine engine) {
228 /* ============================================================ */
231 * Gets the persistence timeout value for sessioninfo records.
233 * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
235 private long getPersistenceTimeout() {
236 String timeoutString = null;
239 timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
241 if (timeoutString != null) {
242 // timeout parameter is specified
243 return Long.valueOf(timeoutString) * 1000;
246 } catch (NumberFormatException e) {
248 "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
256 /* ============================================================ */
259 * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
260 * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
261 * garbage-collected with the container.
263 protected class ContainerAdjunct {
264 /** 'PolicyContainer' instance that this adjunct is extending. */
265 private PolicyContainer policyContainer;
267 /** Maps a KIE session name to its data source. */
268 private Map<String, DsEmf> name2ds = new HashMap<>();
271 * Constructor - initialize a new 'ContainerAdjunct'.
273 * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
275 private ContainerAdjunct(PolicyContainer policyContainer) {
276 this.policyContainer = policyContainer;
280 * Create a new persistent KieSession. If there is already a corresponding entry in the
281 * database, it is used to initialize the KieSession. If not, a completely new session is
284 * @param name the name of the KieSession (which is also the name of the associated
286 * @param kieBaseName the name of the 'KieBase' instance containing this session
287 * @return a new KieSession with persistence enabled
289 private KieSession newPersistentKieSession(String name, String kieBaseName) {
293 BasicDataSource ds = makeDataSource(getDataSourceProperties());
294 DsEmf dsemf = new DsEmf(ds);
297 EntityManagerFactory emf = dsemf.emf;
298 DroolsSessionConnector conn = makeJpaConnector(emf);
300 long desiredSessionId = getSessionId(conn, name);
303 "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
305 // session does not exist -- attempt to create one
307 "getPolicySession:session does not exist -- attempt to create one with name {}", name);
309 Environment env = kieSvcFact.newEnvironment();
311 configureKieEnv(env, emf);
313 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
315 KieSession kieSession =
316 (desiredSessionId >= 0
317 ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
320 if (kieSession == null) {
321 // loadKieSession() returned null or desiredSessionId < 0
323 "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
325 kieSession = newKieSession(kieBaseName, env);
328 replaceSession(conn, name, kieSession);
330 name2ds.put(name, dsemf);
334 } catch (RuntimeException e) {
341 * Loads an existing KieSession from the persistent store.
343 * @param kieBaseName the name of the 'KieBase' instance containing this session
344 * @param desiredSessionId id of the desired KieSession
345 * @param env Kie Environment for the session
346 * @param kConf Kie Configuration for the session
347 * @return the persistent session, or {@code null} if it could not be loaded
349 private KieSession loadKieSession(
350 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
352 KieSession kieSession =
357 policyContainer.getKieContainer().getKieBase(kieBaseName),
361 logger.info("LOADING Loaded session {}", desiredSessionId);
365 } catch (Exception e) {
366 logger.error("loadKieSession error: ", e);
372 * Creates a new, persistent KieSession.
374 * @param kieBaseName the name of the 'KieBase' instance containing this session
375 * @param env Kie Environment for the session
376 * @return a new, persistent session
378 private KieSession newKieSession(String kieBaseName, Environment env) {
379 KieSession kieSession =
382 .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
384 logger.info("LOADING CREATED {}", kieSession.getIdentifier());
390 * Closes the data source associated with a session.
392 * @param name name of the session being destroyed
394 private void destroyKieSession(String name) {
395 closeDataSource(name);
399 * Closes the data source associated with a session.
401 * @param name name of the session being disposed of
403 private void disposeKieSession(String name) {
404 closeDataSource(name);
408 * Closes the data source associated with a session.
410 * @param name name of the session whose data source is to be closed
412 private void closeDataSource(String name) {
413 DsEmf ds = name2ds.remove(name);
419 /** Configures java system properties for JPA/JTA. */
420 private void configureSysProps() {
421 System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
423 "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
424 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
426 "ObjectStoreEnvironmentBean.objectStoreDir",
427 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
431 * Configures a Kie Environment.
433 * @param env environment to be configured
434 * @param emf entity manager factory
436 private void configureKieEnv(Environment env, EntityManagerFactory emf) {
437 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
438 env.set(EnvironmentName.TRANSACTION, getUserTrans());
439 env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, getTransSyncReg());
440 env.set(EnvironmentName.TRANSACTION_MANAGER, getTransMgr());
444 * Gets a session's ID from the persistent store.
446 * @param conn persistence connector
447 * @param sessnm name of the session
448 * @return the session's id, or {@code -1} if the session is not found
450 private long getSessionId(DroolsSessionConnector conn, String sessnm) {
451 DroolsSession sess = conn.get(sessnm);
452 return sess != null ? sess.getSessionId() : -1;
456 * Replaces a session within the persistent store, if it exists. Adds it otherwise.
458 * @param conn persistence connector
459 * @param sessnm name of session to be updated
460 * @param kieSession new session information
462 private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
464 DroolsSessionEntity sess = new DroolsSessionEntity();
466 sess.setSessionName(sessnm);
467 sess.setSessionId(kieSession.getIdentifier());
473 /* ============================================================ */
476 * Gets the data source properties.
478 * @return the data source properties
480 private Properties getDataSourceProperties() {
481 Properties props = new Properties();
482 props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
483 props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
484 props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
485 props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
486 props.put("maxActive", "3");
487 props.put("maxIdle", "1");
488 props.put("maxWait", "120000");
489 props.put("whenExhaustedAction", "2");
490 props.put("testOnBorrow", "false");
491 props.put("poolPreparedStatements", "true");
497 * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
498 * sessions. This also has the useful side-effect of removing abandoned records as well.
500 private void cleanUpSessionInfo() {
502 synchronized (cleanupLock) {
503 if (sessInfoCleaned) {
504 logger.info("Clean up of sessioninfo table: already done");
508 if (sessionInfoTimeoutMs < 0) {
509 logger.info("Clean up of sessioninfo table: no timeout specified");
513 // now do the record deletion
514 try (BasicDataSource ds = makeDataSource(getDataSourceProperties());
515 Connection connection = ds.getConnection();
516 PreparedStatement statement =
517 connection.prepareStatement(
518 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
520 connection.setAutoCommit(true);
522 statement.setLong(1, sessionInfoTimeoutMs / 1000);
524 int count = statement.executeUpdate();
525 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
527 } catch (SQLException e) {
528 logger.error("Clean up of sessioninfo table failed", e);
531 // delete DroolsSessionEntity where sessionId not in (sessinfo.xxx)?
533 sessInfoCleaned = true;
538 * Determine whether persistence is enabled for a specific container.
540 * @param container container to be checked
541 * @param sessionName name of the session to be checked
542 * @return {@code true} if persistence is enabled for this container, and {@code false} if not
544 private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
545 Properties properties = getProperties(container);
546 boolean rval = false;
548 if (properties != null) {
549 // fetch the 'type' property
550 String type = getProperty(properties, sessionName, "type");
551 rval = "auto".equals(type) || "native".equals(type);
558 * Determine the controller properties associated with the policy container.
560 * @param container container whose properties are to be retrieved
561 * @return the container's properties, or {@code null} if not found
563 private Properties getProperties(PolicyContainer container) {
565 return getPolicyController(container).getProperties();
566 } catch (IllegalArgumentException e) {
567 logger.error("getProperties exception: ", e);
573 * Fetch the persistence property associated with a session. The name may have the form:
576 * <li>persistence.SESSION-NAME.PROPERTY
577 * <li>persistence.PROPERTY
580 * @param properties properties from which the value is to be retrieved
581 * @param sessionName session name of interest
582 * @param property property name of interest
583 * @return the property value, or {@code null} if not found
585 private String getProperty(Properties properties, String sessionName, String property) {
586 String value = properties.getProperty("persistence." + sessionName + "." + property);
588 value = properties.getProperty("persistence." + property);
594 /* ============================================================ */
597 * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
598 * 'fireUntilHalt' method isn't compatible with persistence.
600 public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
602 /** Session associated with this persistent thread. */
603 private final PolicySession session;
605 /** The session thread. */
606 private final Thread thread;
608 /** Used to indicate that processing should stop. */
609 private final CountDownLatch stopped = new CountDownLatch(1);
611 /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
612 long minSleepTime = 100;
615 * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
616 * is a "half" time, so that we can multiply it by two without overflowing the word size.
618 long halfMaxSleepTime = 5000L / 2L;
621 * Constructor - initialize variables and create thread.
623 * @param session the 'PolicySession' instance
624 * @param properties may contain additional session properties
626 public PersistentThreadModel(PolicySession session, Properties properties) {
627 this.session = session;
628 this.thread = new Thread(this, getThreadName());
630 if (properties == null) {
634 // extract 'minSleepTime' and/or 'maxSleepTime'
635 String name = session.getName();
637 // fetch 'minSleepTime' value, and update if defined
638 String sleepTimeString = getProperty(properties, name, "minSleepTime");
639 if (sleepTimeString != null) {
641 minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
642 } catch (Exception e) {
643 logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
647 // fetch 'maxSleepTime' value, and update if defined
648 long maxSleepTime = 2 * halfMaxSleepTime;
649 sleepTimeString = getProperty(properties, name, "maxSleepTime");
650 if (sleepTimeString != null) {
652 maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
653 } catch (Exception e) {
654 logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
658 // swap values if needed
659 if (minSleepTime > maxSleepTime) {
663 + ") is greater than maxSleepTime("
666 long tmp = minSleepTime;
667 minSleepTime = maxSleepTime;
671 halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
677 * @return the String to use as the thread name */
678 private String getThreadName() {
679 return "Session " + session.getFullName() + " (persistent)";
682 /*=========================*/
683 /* 'ThreadModel' interface */
684 /*=========================*/
690 public void start() {
699 // tell the thread to stop
702 // wait up to 10 seconds for the thread to stop
706 } catch (InterruptedException e) {
707 logger.error("stopThread exception: ", e);
708 Thread.currentThread().interrupt();
711 // verify that it's done
712 if (thread.isAlive()) {
713 logger.error("stopThread: still running");
721 public void updated() {
722 // the container artifact has been updated -- adjust the thread name
723 thread.setName(getThreadName());
726 /*======================*/
727 /* 'Runnable' interface */
728 /*======================*/
735 logger.info("PersistentThreadModel running");
737 // set thread local variable
738 session.setPolicySession();
740 KieSession kieSession = session.getKieSession();
741 long sleepTime = 2 * halfMaxSleepTime;
743 // We want to continue, despite any exceptions that occur
744 // while rules are fired.
750 if (kieSession.fireAllRules() > 0) {
751 // some rules fired -- reduce poll delay
752 sleepTime = Math.max(minSleepTime, sleepTime / 2);
754 // no rules fired -- increase poll delay
755 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
758 } catch (Exception | LinkageError e) {
759 logger.error("Exception during kieSession.fireAllRules", e);
763 if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
767 } catch (InterruptedException e) {
768 logger.error("startThread exception: ", e);
769 Thread.currentThread().interrupt();
774 logger.info("PersistentThreadModel completed");
778 /* ============================================================ */
780 /** DataSource-EntityManagerFactory pair. */
781 private class DsEmf {
782 private BasicDataSource bds;
783 private EntityManagerFactory emf;
786 * Makes an entity manager factory for the given data source.
788 * @param bds pooled data source
790 public DsEmf(BasicDataSource bds) {
792 Map<String, Object> props = new HashMap<>();
793 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
796 this.emf = makeEntMgrFact(props);
798 } catch (RuntimeException e) {
804 /** Closes the entity manager factory and the data source. */
805 public void close() {
809 } catch (RuntimeException e) {
817 /** Closes the data source only. */
818 private void closeDataSource() {
822 } catch (SQLException e) {
823 throw new PersistenceFeatureException(e);
828 private static class SingletonRegistry {
829 private static final TransactionSynchronizationRegistry transreg =
830 new com.arjuna.ats.internal.jta.transaction.arjunacore
831 .TransactionSynchronizationRegistryImple();
833 private SingletonRegistry() {
838 /** Factory for various items. Methods can be overridden for junit testing. */
841 * Gets the transaction manager.
843 * @return the transaction manager
845 protected TransactionManager getTransMgr() {
846 return com.arjuna.ats.jta.TransactionManager.transactionManager();
850 * Gets the user transaction.
852 * @return the user transaction
854 protected UserTransaction getUserTrans() {
855 return com.arjuna.ats.jta.UserTransaction.userTransaction();
859 * Gets the transaction synchronization registry.
861 * @return the transaction synchronization registry
863 protected TransactionSynchronizationRegistry getTransSyncReg() {
864 return SingletonRegistry.transreg;
868 * Gets the KIE services.
870 * @return the KIE services
872 protected KieServices getKieServices() {
873 return KieServices.Factory.get();
877 * Loads properties from a file.
879 * @param filenm name of the file to load
880 * @return properties, as loaded from the file
881 * @throws IOException if an error occurs reading from the file
883 protected Properties loadProperties(String filenm) throws IOException {
884 return PropertyUtil.getProperties(filenm);
888 * Makes a Data Source.
890 * @param dsProps data source properties
891 * @return a new data source
893 protected BasicDataSource makeDataSource(Properties dsProps) {
895 return BasicDataSourceFactory.createDataSource(dsProps);
897 } catch (Exception e) {
898 throw new PersistenceFeatureException(e);
903 * Makes a new JPA connector for drools sessions.
905 * @param emf entity manager factory
906 * @return a new JPA connector for drools sessions
908 protected DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
909 return new JpaDroolsSessionConnector(emf);
913 * Makes a new entity manager factory.
915 * @param props properties with which the factory should be configured
916 * @return a new entity manager factory
918 protected EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
919 return Persistence.createEntityManagerFactory("onapsessionsPU", props);
923 * Gets the policy controller associated with a given policy container.
925 * @param container container whose controller is to be retrieved
926 * @return the container's controller
928 protected PolicyController getPolicyController(PolicyContainer container) {
929 return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
933 * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
934 * particularly when they are not, themselves, Runtime exceptions.
936 public static class PersistenceFeatureException extends RuntimeException {
937 private static final long serialVersionUID = 1L;
942 public PersistenceFeatureException(Exception ex) {