2 * ============LICENSE_START=======================================================
3 * feature-session-persistence
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.persistence;
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.sql.Connection;
27 import java.sql.DriverManager;
28 import java.sql.PreparedStatement;
29 import java.sql.SQLException;
30 import java.util.HashMap;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
36 import javax.persistence.EntityManagerFactory;
37 import javax.persistence.Persistence;
39 import org.eclipse.persistence.config.PersistenceUnitProperties;
40 import org.kie.api.KieServices;
41 import org.kie.api.runtime.Environment;
42 import org.kie.api.runtime.EnvironmentName;
43 import org.kie.api.runtime.KieSession;
44 import org.kie.api.runtime.KieSessionConfiguration;
45 import org.onap.policy.drools.core.PolicyContainer;
46 import org.onap.policy.drools.core.PolicySession;
47 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
48 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
49 import org.onap.policy.drools.system.PolicyController;
50 import org.onap.policy.drools.system.PolicyEngine;
51 import org.onap.policy.drools.utils.PropertyUtil;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import bitronix.tm.BitronixTransactionManager;
56 import bitronix.tm.Configuration;
57 import bitronix.tm.TransactionManagerServices;
58 import bitronix.tm.resource.jdbc.PoolingDataSource;
61 * If this feature is supported, there is a single instance of it. It adds
62 * persistence to Drools sessions. In addition, if an active-standby feature
63 * exists, then that is used to determine the active and last-active PDP. If it
64 * does not exist, then the current host name is used as the PDP id.
66 * The bulk of the code here was once in other classes, such as
67 * 'PolicyContainer' and 'Main'. It was moved here as part of making this a
68 * separate optional feature.
70 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
72 private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
76 * Standard factory used to get various items.
78 private static Factory stdFactory = new Factory();
81 * Factory used to get various items.
83 private Factory fact = stdFactory;
88 private KieServices kieSvcFact;
93 private String hostName;
96 * Persistence properties.
98 private Properties persistProps;
101 * Whether or not the SessionInfo records should be cleaned out.
103 private boolean sessInfoCleaned;
106 * SessionInfo timeout, in milli-seconds, as read from
107 * {@link #persistProps}.
109 private long sessionInfoTimeoutMs;
112 * Object used to serialize cleanup of sessioninfo table.
114 private Object cleanupLock = new Object();
117 * Sets the factory to be used during junit testing.
122 protected void setFactory(Factory fact) {
127 * Lookup the adjunct for this feature that is associated with the specified
128 * PolicyContainer. If not found, create one.
130 * @param policyContainer
131 * the container whose adjunct we are looking up, and possibly
133 * @return the associated 'ContainerAdjunct' instance, which may be new
135 private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
137 Object rval = policyContainer.getAdjunct(this);
139 if (rval == null || !(rval instanceof ContainerAdjunct)) {
140 // adjunct does not exist, or has the wrong type (should never
142 rval = new ContainerAdjunct(policyContainer);
143 policyContainer.setAdjunct(this, rval);
146 return ((ContainerAdjunct) rval);
153 public int getSequenceNumber() {
161 public void globalInit(String args[], String configDir) {
163 kieSvcFact = fact.getKieServices();
168 persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
170 } catch (IOException e1) {
171 logger.error("initializePersistence: ", e1);
174 sessionInfoTimeoutMs = getPersistenceTimeout();
176 Configuration bitronixConfiguration = fact.getTransMgrConfig();
177 bitronixConfiguration.setJournal(null);
178 bitronixConfiguration.setServerId(hostName);
182 * Creates a persistent KieSession, loading it from the persistent store, or
183 * creating one, if it does not exist yet.
186 public KieSession activatePolicySession(PolicyContainer policyContainer, String name, String kieBaseName) {
188 if (isPersistenceEnabled(policyContainer, name)) {
189 cleanUpSessionInfo();
191 return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
201 public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
202 PolicyContainer policyContainer = session.getPolicyContainer();
203 if (isPersistenceEnabled(policyContainer, session.getName())) {
204 return (new PersistentThreadModel(session, getProperties(policyContainer)));
213 public void disposeKieSession(PolicySession policySession) {
215 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
216 if(contAdj != null) {
217 contAdj.disposeKieSession( policySession.getName());
225 public void destroyKieSession(PolicySession policySession) {
227 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
228 if(contAdj != null) {
229 contAdj.destroyKieSession( policySession.getName());
237 public boolean afterStart(PolicyEngine engine) {
245 public boolean beforeStart(PolicyEngine engine) {
246 synchronized (cleanupLock) {
247 sessInfoCleaned = false;
257 public boolean beforeActivate(PolicyEngine engine) {
258 synchronized (cleanupLock) {
259 sessInfoCleaned = false;
269 public boolean afterActivate(PolicyEngine engine) {
273 /* ============================================================ */
276 * Gets the persistence timeout value for sessioninfo records.
278 * @return the timeout value, in milli-seconds, or {@code -1} if it is
279 * unspecified or invalid
281 private long getPersistenceTimeout() {
282 String timeoutString = null;
285 timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
287 if (timeoutString != null) {
288 // timeout parameter is specified
289 return Long.valueOf(timeoutString) * 1000;
292 } catch (NumberFormatException e) {
293 logger.error("Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
301 * Initializes {@link #hostName}.
303 private void initHostName() {
306 hostName = fact.getHostName();
308 } catch (UnknownHostException e) {
309 throw new RuntimeException("cannot determine local hostname", e);
313 /* ============================================================ */
316 * Each instance of this class is a logical extension of a 'PolicyContainer'
317 * instance. Its reference is stored in the 'adjuncts' table within the
318 * 'PolicyContainer', and will be garbage-collected with the container.
320 protected class ContainerAdjunct {
322 * 'PolicyContainer' instance that this adjunct is extending.
324 private PolicyContainer policyContainer;
327 * Maps a KIE session name to its data source.
329 private Map<String,PoolingDataSource> name2ds = new HashMap<>();
332 * Constructor - initialize a new 'ContainerAdjunct'
334 * @param policyContainer
335 * the 'PolicyContainer' instance this adjunct is extending
337 private ContainerAdjunct(PolicyContainer policyContainer) {
338 this.policyContainer = policyContainer;
342 * Create a new persistent KieSession. If there is already a
343 * corresponding entry in the database, it is used to initialize the
344 * KieSession. If not, a completely new session is created.
347 * the name of the KieSession (which is also the name of the
348 * associated PolicySession)
350 * the name of the 'KieBase' instance containing this session
351 * @return a new KieSession with persistence enabled
353 private KieSession newPersistentKieSession(String name, String kieBaseName) {
355 long desiredSessionId;
357 DroolsSessionConnector conn = getDroolsSessionConnector("onapPU");
359 desiredSessionId = getSessionId(conn, name);
361 logger.info("\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
363 // session does not exist -- attempt to create one
364 logger.info("getPolicySession:session does not exist -- attempt to create one with name {}", name);
366 System.getProperties().put("java.naming.factory.initial", "bitronix.tm.jndi.BitronixInitialContextFactory");
368 Environment env = kieSvcFact.newEnvironment();
369 String dsName = loadDataSource(name);
371 configureKieEnv(name, env, dsName);
373 KieSessionConfiguration kConf = kieSvcFact.newKieSessionConfiguration();
375 KieSession kieSession = (desiredSessionId >= 0 ? loadKieSession(kieBaseName, desiredSessionId, env, kConf)
378 if (kieSession == null) {
379 // loadKieSession() returned null or desiredSessionId < 0
380 logger.info("LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
382 kieSession = newKieSession(kieBaseName, env);
385 replaceSession(conn, name, kieSession);
391 * Loads a data source into {@link #name2ds}, if one doesn't exist
393 * @param sessName session name
394 * @return the unique data source name
396 private String loadDataSource(String sessName) {
397 PoolingDataSource ds = name2ds.get(sessName);
400 Properties props = new Properties();
401 addOptProp(props, "URL", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
402 addOptProp(props, "user", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
403 addOptProp(props, "password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
405 ds = fact.makePoolingDataSource();
406 ds.setUniqueName("jdbc/BitronixJTADataSource/" + sessName);
407 ds.setClassName(persistProps.getProperty(DroolsPersistenceProperties.DB_DATA_SOURCE));
408 ds.setMaxPoolSize(3);
409 ds.setIsolationLevel("SERIALIZABLE");
410 ds.setAllowLocalTransactions(true);
411 ds.getDriverProperties().putAll(props);
414 name2ds.put(sessName, ds);
417 return ds.getUniqueName();
421 * Configures a Kie Environment
426 * environment to be configured
430 private void configureKieEnv(String name, Environment env, String dsName) {
431 Properties emfProperties = new Properties();
432 emfProperties.setProperty(PersistenceUnitProperties.JTA_DATASOURCE, dsName);
434 EntityManagerFactory emfact = fact.makeEntMgrFact("onapsessionsPU", emfProperties);
436 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emfact);
437 env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
441 * Loads an existing KieSession from the persistent store.
444 * the name of the 'KieBase' instance containing this session
445 * @param desiredSessionId
446 * id of the desired KieSession
448 * Kie Environment for the session
450 * Kie Configuration for the session
451 * @return the persistent session, or {@code null} if it could not be
454 private KieSession loadKieSession(String kieBaseName, long desiredSessionId, Environment env,
455 KieSessionConfiguration kConf) {
457 KieSession kieSession = kieSvcFact.getStoreServices().loadKieSession(desiredSessionId,
458 policyContainer.getKieContainer().getKieBase(kieBaseName), kConf, env);
460 logger.info("LOADING Loaded session {}", desiredSessionId);
464 } catch (Exception e) {
465 logger.error("loadKieSession error: ", e);
471 * Creates a new, persistent KieSession.
474 * the name of the 'KieBase' instance containing this session
476 * Kie Environment for the session
477 * @return a new, persistent session
479 private KieSession newKieSession(String kieBaseName, Environment env) {
480 KieSession kieSession = kieSvcFact.getStoreServices()
481 .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
483 logger.info("LOADING CREATED {}", kieSession.getIdentifier());
489 * Closes the data source associated with a session.
490 * @param name name of the session being destroyed
492 private void destroyKieSession(String name) {
493 closeDataSource(name);
497 * Closes the data source associated with a session.
498 * @param name name of the session being disposed of
500 private void disposeKieSession(String name) {
501 closeDataSource(name);
505 * Closes the data source associated with a session.
506 * @param name name of the session whose data source is to be closed
508 private void closeDataSource(String name) {
509 PoolingDataSource ds = name2ds.remove(name);
516 /* ============================================================ */
519 * Removes "old" Drools 'sessioninfo' records, so they aren't used to
520 * restore data to Drools sessions. This also has the useful side-effect of
521 * removing abandoned records as well.
523 private void cleanUpSessionInfo() {
525 synchronized (cleanupLock) {
527 if (sessInfoCleaned) {
528 logger.info("Clean up of sessioninfo table: already done");
532 if (sessionInfoTimeoutMs < 0) {
533 logger.info("Clean up of sessioninfo table: no timeout specified");
537 // get DB connection properties
538 String url = persistProps.getProperty(DroolsPersistenceProperties.DB_URL);
539 String user = persistProps.getProperty(DroolsPersistenceProperties.DB_USER);
540 String password = persistProps.getProperty(DroolsPersistenceProperties.DB_PWD);
542 if (url == null || user == null || password == null) {
543 logger.error("Missing DB properties for clean up of sessioninfo table");
547 // now do the record deletion
548 try (Connection connection = fact.makeDbConnection(url, user, password);
549 PreparedStatement statement = connection.prepareStatement(
550 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
551 statement.setLong(1, sessionInfoTimeoutMs/1000);
553 int count = statement.executeUpdate();
554 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
556 } catch (SQLException e) {
557 logger.error("Clean up of sessioninfo table failed", e);
560 // TODO: delete DroolsSessionEntity where sessionId not in
563 sessInfoCleaned = true;
568 * Gets a connector for manipulating DroolsSession objects within the
572 * @return a connector for DroolsSession objects
574 private DroolsSessionConnector getDroolsSessionConnector(String pu) {
576 Properties propMap = new Properties();
577 addOptProp(propMap, "javax.persistence.jdbc.driver",
578 persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
579 addOptProp(propMap, "javax.persistence.jdbc.url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
580 addOptProp(propMap, "javax.persistence.jdbc.user",
581 persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
582 addOptProp(propMap, "javax.persistence.jdbc.password",
583 persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
585 return fact.makeJpaConnector(pu, propMap);
589 * Adds an optional property to a set of properties.
590 * @param propMap map into which the property should be added
591 * @param name property name
592 * @param value property value, or {@code null} if it should not
595 private void addOptProp(Properties propMap, String name, String value) {
597 propMap.put(name, value);
602 * Gets a session's ID from the persistent store.
605 * persistence connector
607 * name of the session
608 * @return the session's id, or {@code -1} if the session is not found
610 private long getSessionId(DroolsSessionConnector conn, String sessnm) {
611 DroolsSession sess = conn.get(sessnm);
612 return (sess != null ? sess.getSessionId() : -1);
616 * Replaces a session within the persistent store, if it exists. Adds
620 * persistence connector
622 * name of session to be updated
624 * new session information
626 private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
628 DroolsSessionEntity sess = new DroolsSessionEntity();
630 sess.setSessionName(sessnm);
631 sess.setSessionId(kieSession.getIdentifier());
637 * Determine whether persistence is enabled for a specific container
640 * container to be checked
642 * name of the session to be checked
643 * @return {@code true} if persistence is enabled for this container, and
644 * {@code false} if not
646 private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
647 Properties properties = getProperties(container);
648 boolean rval = false;
650 if (properties != null) {
651 // fetch the 'type' property
652 String type = getProperty(properties, sessionName, "type");
653 rval = ("auto".equals(type) || "native".equals(type));
660 * Determine the controller properties associated with the policy container.
663 * container whose properties are to be retrieved
664 * @return the container's properties, or {@code null} if not found
666 private Properties getProperties(PolicyContainer container) {
668 return (fact.getPolicyContainer(container).getProperties());
669 } catch (IllegalArgumentException e) {
670 logger.error("getProperties exception: ", e);
676 * Fetch the persistence property associated with a session. The name may
679 * <li>persistence.SESSION-NAME.PROPERTY</li>
680 * <li>persistence.PROPERTY</li>
684 * properties from which the value is to be retrieved
686 * session name of interest
688 * property name of interest
689 * @return the property value, or {@code null} if not found
691 private String getProperty(Properties properties, String sessionName, String property) {
692 String value = properties.getProperty("persistence." + sessionName + "." + property);
694 value = properties.getProperty("persistence." + property);
700 /* ============================================================ */
703 * This 'ThreadModel' variant periodically calls
704 * 'KieSession.fireAllRules()', because the 'fireUntilHalt' method isn't
705 * compatible with persistence.
707 public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
710 * Session associated with this persistent thread.
712 private final PolicySession session;
715 * The session thread.
717 private final Thread thread;
720 * Used to indicate that processing should stop.
722 private final CountDownLatch stopped = new CountDownLatch(1);
725 * Minimum time, in milli-seconds, that the thread should sleep
726 * before firing rules again.
728 long minSleepTime = 100;
731 * Maximum time, in milli-seconds, that the thread should sleep
732 * before firing rules again. This is a "half" time, so that
733 * we can multiply it by two without overflowing the word size.
735 long halfMaxSleepTime = 5000L / 2L;
738 * Constructor - initialize variables and create thread
741 * the 'PolicySession' instance
743 * may contain additional session properties
745 public PersistentThreadModel(PolicySession session, Properties properties) {
746 this.session = session;
747 this.thread = new Thread(this, getThreadName());
749 if (properties == null) {
753 // extract 'minSleepTime' and/or 'maxSleepTime'
754 String name = session.getName();
756 // fetch 'minSleepTime' value, and update if defined
757 String sleepTimeString = getProperty(properties, name, "minSleepTime");
758 if (sleepTimeString != null) {
760 minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
761 } catch (Exception e) {
762 logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
766 // fetch 'maxSleepTime' value, and update if defined
767 long maxSleepTime = 2 * halfMaxSleepTime;
768 sleepTimeString = getProperty(properties, name, "maxSleepTime");
769 if (sleepTimeString != null) {
771 maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
772 } catch (Exception e) {
773 logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
777 // swap values if needed
778 if (minSleepTime > maxSleepTime) {
779 logger.error("minSleepTime(" + minSleepTime + ") is greater than maxSleepTime(" + maxSleepTime
781 long tmp = minSleepTime;
782 minSleepTime = maxSleepTime;
786 halfMaxSleepTime = Math.max(1, maxSleepTime/2);
790 * @return the String to use as the thread name
792 private String getThreadName() {
793 return ("Session " + session.getFullName() + " (persistent)");
796 /***************************/
797 /* 'ThreadModel' interface */
798 /***************************/
804 public void start() {
813 // tell the thread to stop
816 // wait up to 10 seconds for the thread to stop
820 } catch (InterruptedException e) {
821 logger.error("stopThread exception: ", e);
822 Thread.currentThread().interrupt();
825 // verify that it's done
826 if(thread.isAlive()) {
827 logger.error("stopThread: still running");
835 public void updated() {
836 // the container artifact has been updated -- adjust the thread name
837 thread.setName(getThreadName());
840 /************************/
841 /* 'Runnable' interface */
842 /************************/
849 logger.info("PersistentThreadModel running");
851 // set thread local variable
852 session.setPolicySession();
854 KieSession kieSession = session.getKieSession();
855 long sleepTime = 2 * halfMaxSleepTime;
857 // We want to continue, despite any exceptions that occur
858 // while rules are fired.
863 if (kieSession.fireAllRules() > 0) {
864 // some rules fired -- reduce poll delay
865 sleepTime = Math.max(minSleepTime, sleepTime/2);
867 // no rules fired -- increase poll delay
868 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
870 } catch (Exception | LinkageError e) {
871 logger.error("Exception during kieSession.fireAllRules", e);
875 if(stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
879 } catch (InterruptedException e) {
880 logger.error("startThread exception: ", e);
881 Thread.currentThread().interrupt();
886 logger.info("PersistentThreadModel completed");
890 /* ============================================================ */
893 * Factory for various items. Methods can be overridden for junit testing.
895 protected static class Factory {
898 * Gets the configuration for the transaction manager.
900 * @return the configuration for the transaction manager
902 public Configuration getTransMgrConfig() {
903 return TransactionManagerServices.getConfiguration();
907 * Gets the transaction manager.
909 * @return the transaction manager
911 public BitronixTransactionManager getTransMgr() {
912 return TransactionManagerServices.getTransactionManager();
916 * Gets the KIE services.
918 * @return the KIE services
920 public KieServices getKieServices() {
921 return KieServices.Factory.get();
925 * Gets the current host name.
927 * @return the current host name, associated with the IP address of the
929 * @throws UnknownHostException
931 public String getHostName() throws UnknownHostException {
932 return InetAddress.getLocalHost().getHostName();
936 * Loads properties from a file.
939 * name of the file to load
940 * @return properties, as loaded from the file
941 * @throws IOException
942 * if an error occurs reading from the file
944 public Properties loadProperties(String filenm) throws IOException {
945 return PropertyUtil.getProperties(filenm);
949 * Makes a connection to the DB.
957 * @return a new DB connection
958 * @throws SQLException
960 public Connection makeDbConnection(String url, String user, String pass) throws SQLException {
962 return DriverManager.getConnection(url, user, pass);
966 * Makes a new pooling data source.
968 * @return a new pooling data source
970 public PoolingDataSource makePoolingDataSource() {
971 return new PoolingDataSource();
975 * Makes a new JPA connector for drools sessions.
978 * PU for the entity manager factory
980 * properties with which the factory should be configured
981 * @return a new JPA connector for drools sessions
983 public DroolsSessionConnector makeJpaConnector(String pu, Properties propMap) {
985 EntityManagerFactory emf = makeEntMgrFact(pu, propMap);
987 return new JpaDroolsSessionConnector(emf);
991 * Makes a new entity manager factory.
994 * PU for the entity manager factory
996 * properties with which the factory should be configured
997 * @return a new entity manager factory
999 public EntityManagerFactory makeEntMgrFact(String pu, Properties propMap) {
1000 return Persistence.createEntityManagerFactory(pu, propMap);
1004 * Gets the policy controller associated with a given policy container.
1007 * container whose controller is to be retrieved
1008 * @return the container's controller
1010 public PolicyController getPolicyContainer(PolicyContainer container) {
1011 return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());