b48690b03dea9b7e721852c43af8ad117f23e8df
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.sql.Connection;
27 import java.sql.DriverManager;
28 import java.sql.PreparedStatement;
29 import java.sql.SQLException;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35
36 import javax.persistence.EntityManagerFactory;
37 import javax.persistence.Persistence;
38
39 import org.eclipse.persistence.config.PersistenceUnitProperties;
40 import org.kie.api.KieServices;
41 import org.kie.api.runtime.Environment;
42 import org.kie.api.runtime.EnvironmentName;
43 import org.kie.api.runtime.KieSession;
44 import org.kie.api.runtime.KieSessionConfiguration;
45 import org.onap.policy.drools.core.PolicyContainer;
46 import org.onap.policy.drools.core.PolicySession;
47 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
48 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
49 import org.onap.policy.drools.system.PolicyController;
50 import org.onap.policy.drools.system.PolicyEngine;
51 import org.onap.policy.drools.utils.PropertyUtil;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import bitronix.tm.BitronixTransactionManager;
56 import bitronix.tm.Configuration;
57 import bitronix.tm.TransactionManagerServices;
58 import bitronix.tm.resource.jdbc.PoolingDataSource;
59
60 /**
61  * If this feature is supported, there is a single instance of it. It adds
62  * persistence to Drools sessions. In addition, if an active-standby feature
63  * exists, then that is used to determine the active and last-active PDP. If it
64  * does not exist, then the current host name is used as the PDP id.
65  *
66  * The bulk of the code here was once in other classes, such as
67  * 'PolicyContainer' and 'Main'. It was moved here as part of making this a
68  * separate optional feature.
69  */
70 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
71
72         private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
73
74
75         /**
76          * Standard factory used to get various items.
77          */
78         private static Factory stdFactory = new Factory();
79
80         /**
81          * Factory used to get various items.
82          */
83         private Factory fact = stdFactory;
84
85         /**
86          * KieService factory.
87          */
88         private KieServices kieSvcFact;
89
90         /**
91          * Host name.
92          */
93         private String hostName;
94
95         /**
96          * Persistence properties.
97          */
98         private Properties persistProps;
99
100         /**
101          * Whether or not the SessionInfo records should be cleaned out.
102          */
103         private boolean sessInfoCleaned;
104
105         /**
106          * SessionInfo timeout, in milli-seconds, as read from
107          * {@link #persistProps}.
108          */
109         private long sessionInfoTimeoutMs;
110
111         /**
112          * Object used to serialize cleanup of sessioninfo table.
113          */
114         private Object cleanupLock = new Object();
115
116         /**
117          * Sets the factory to be used during junit testing.
118          * 
119          * @param fact
120          *            factory to be used
121          */
122         protected void setFactory(Factory fact) {
123                 this.fact = fact;
124         }
125
126         /**
127          * Lookup the adjunct for this feature that is associated with the specified
128          * PolicyContainer. If not found, create one.
129          * 
130          * @param policyContainer
131          *            the container whose adjunct we are looking up, and possibly
132          *            creating
133          * @return the associated 'ContainerAdjunct' instance, which may be new
134          */
135         private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
136
137                 Object rval = policyContainer.getAdjunct(this);
138
139                 if (rval == null || !(rval instanceof ContainerAdjunct)) {
140                         // adjunct does not exist, or has the wrong type (should never
141                         // happen)
142                         rval = new ContainerAdjunct(policyContainer);
143                         policyContainer.setAdjunct(this, rval);
144                 }
145
146                 return ((ContainerAdjunct) rval);
147         }
148
149         /**
150          * {@inheritDoc}
151          */
152         @Override
153         public int getSequenceNumber() {
154                 return (1);
155         }
156
157         /**
158          * {@inheritDoc}
159          */
160         @Override
161         public void globalInit(String args[], String configDir) {
162                 
163                 kieSvcFact = fact.getKieServices();
164
165                 initHostName();
166
167                 try {
168                         persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
169
170                 } catch (IOException e1) {
171                         logger.error("initializePersistence: ", e1);
172                 }
173
174                 sessionInfoTimeoutMs = getPersistenceTimeout();
175
176                 Configuration bitronixConfiguration = fact.getTransMgrConfig();
177                 bitronixConfiguration.setJournal(null);
178                 bitronixConfiguration.setServerId(hostName);
179         }
180
181         /**
182          * Creates a persistent KieSession, loading it from the persistent store, or
183          * creating one, if it does not exist yet.
184          */
185         @Override
186         public KieSession activatePolicySession(PolicyContainer policyContainer, String name, String kieBaseName) {
187
188                 if (isPersistenceEnabled(policyContainer, name)) {
189                         cleanUpSessionInfo();
190
191                         return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
192                 }
193
194                 return null;
195         }
196
197         /**
198          * {@inheritDoc}
199          */
200         @Override
201         public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
202                 PolicyContainer policyContainer = session.getPolicyContainer();
203                 if (isPersistenceEnabled(policyContainer, session.getName())) {
204                         return (new PersistentThreadModel(session, getProperties(policyContainer)));
205                 }
206                 return (null);
207         }
208
209         /**
210          * {@inheritDoc}
211          */
212         @Override
213         public void disposeKieSession(PolicySession policySession) {
214                 
215                 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
216                 if(contAdj != null) {
217                         contAdj.disposeKieSession( policySession.getName());
218                 }
219         }
220
221         /**
222          * {@inheritDoc}
223          */
224         @Override
225         public void destroyKieSession(PolicySession policySession) {
226
227                 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
228                 if(contAdj != null) {
229                         contAdj.destroyKieSession( policySession.getName());
230                 }
231         }
232
233         /**
234          * {@inheritDoc}
235          */
236         @Override
237         public boolean afterStart(PolicyEngine engine) {
238                 return false;
239         }
240
241         /**
242          * {@inheritDoc}
243          */
244         @Override
245         public boolean beforeStart(PolicyEngine engine) {
246                 synchronized (cleanupLock) {
247                         sessInfoCleaned = false;
248                 }
249
250                 return false;
251         }
252
253         /**
254          * {@inheritDoc}
255          */
256         @Override
257         public boolean beforeActivate(PolicyEngine engine) {
258                 synchronized (cleanupLock) {
259                         sessInfoCleaned = false;
260                 }
261
262                 return false;
263         }
264
265         /**
266          * {@inheritDoc}
267          */
268         @Override
269         public boolean afterActivate(PolicyEngine engine) {
270                 return false;
271         }
272
273         /* ============================================================ */
274
275         /**
276          * Gets the persistence timeout value for sessioninfo records.
277          * 
278          * @return the timeout value, in milli-seconds, or {@code -1} if it is
279          *         unspecified or invalid
280          */
281         private long getPersistenceTimeout() {
282                 String timeoutString = null;
283
284                 try {
285                         timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
286
287                         if (timeoutString != null) {
288                                 // timeout parameter is specified
289                                 return Long.valueOf(timeoutString) * 1000;
290                         }
291
292                 } catch (NumberFormatException e) {
293                         logger.error("Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
294                                          timeoutString, e);
295                 }
296
297                 return -1;
298         }
299
300         /**
301          * Initializes {@link #hostName}.
302          */
303         private void initHostName() {
304
305                 try {
306                         hostName = fact.getHostName();
307
308                 } catch (UnknownHostException e) {
309                         throw new RuntimeException("cannot determine local hostname", e);
310                 }
311         }
312
313         /* ============================================================ */
314
315         /**
316          * Each instance of this class is a logical extension of a 'PolicyContainer'
317          * instance. Its reference is stored in the 'adjuncts' table within the
318          * 'PolicyContainer', and will be garbage-collected with the container.
319          */
320         protected class ContainerAdjunct {
321                 /**
322                  * 'PolicyContainer' instance that this adjunct is extending.
323                  */
324                 private PolicyContainer policyContainer;
325                 
326                 /**
327                  * Maps a KIE session name to its data source.
328                  */
329                 private Map<String,PoolingDataSource> name2ds = new HashMap<>();
330
331                 /**
332                  * Constructor - initialize a new 'ContainerAdjunct'
333                  *
334                  * @param policyContainer
335                  *            the 'PolicyContainer' instance this adjunct is extending
336                  */
337                 private ContainerAdjunct(PolicyContainer policyContainer) {
338                         this.policyContainer = policyContainer;
339                 }
340
341                 /**
342                  * Create a new persistent KieSession. If there is already a
343                  * corresponding entry in the database, it is used to initialize the
344                  * KieSession. If not, a completely new session is created.
345                  * 
346                  * @param name
347                  *            the name of the KieSession (which is also the name of the
348                  *            associated PolicySession)
349                  * @param kieBaseName
350                  *            the name of the 'KieBase' instance containing this session
351                  * @return a new KieSession with persistence enabled
352                  */
353                 private KieSession newPersistentKieSession(String name, String kieBaseName) {
354
355                         long desiredSessionId;
356
357                         DroolsSessionConnector conn = getDroolsSessionConnector("onapPU");
358
359                         desiredSessionId = getSessionId(conn, name);
360
361                         logger.info("\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
362
363                         // session does not exist -- attempt to create one
364                         logger.info("getPolicySession:session does not exist -- attempt to create one with name {}", name);
365
366                         System.getProperties().put("java.naming.factory.initial", "bitronix.tm.jndi.BitronixInitialContextFactory");
367
368                         Environment env = kieSvcFact.newEnvironment();
369                         String dsName = loadDataSource(name);
370
371                         configureKieEnv(name, env, dsName);
372
373                         KieSessionConfiguration kConf = kieSvcFact.newKieSessionConfiguration();
374
375                         KieSession kieSession = (desiredSessionId >= 0 ? loadKieSession(kieBaseName, desiredSessionId, env, kConf)
376                                         : null);
377
378                         if (kieSession == null) {
379                                 // loadKieSession() returned null or desiredSessionId < 0
380                                 logger.info("LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
381
382                                 kieSession = newKieSession(kieBaseName, env);
383                         }
384
385                         replaceSession(conn, name, kieSession);
386
387                         return kieSession;
388                 }
389
390                 /**
391                  * Loads a data source into {@link #name2ds}, if one doesn't exist
392                  * yet.
393                  * @param sessName              session name
394                  * @return the unique data source name
395                  */
396                 private String loadDataSource(String sessName) {
397                         PoolingDataSource ds = name2ds.get(sessName);
398                         
399                         if(ds == null) {
400                                 Properties props = new Properties();
401                                 addOptProp(props, "URL", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
402                                 addOptProp(props, "user", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
403                                 addOptProp(props, "password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
404
405                                 ds = fact.makePoolingDataSource();
406                                 ds.setUniqueName("jdbc/BitronixJTADataSource/" + sessName);
407                                 ds.setClassName(persistProps.getProperty(DroolsPersistenceProperties.DB_DATA_SOURCE));
408                                 ds.setMaxPoolSize(3);
409                                 ds.setIsolationLevel("SERIALIZABLE");
410                                 ds.setAllowLocalTransactions(true);
411                                 ds.getDriverProperties().putAll(props);
412                                 ds.init();
413                                 
414                                 name2ds.put(sessName, ds);
415                         }
416                         
417                         return ds.getUniqueName();
418                 }
419
420                 /**
421                  * Configures a Kie Environment
422                  * 
423                  * @param name
424                  *                              session name
425                  * @param env
426                  *                              environment to be configured
427                  * @param dsName 
428                  *                              data source name
429                  */
430                 private void configureKieEnv(String name, Environment env, String dsName) {
431                         Properties emfProperties = new Properties();
432                         emfProperties.setProperty(PersistenceUnitProperties.JTA_DATASOURCE, dsName);
433                         
434                         EntityManagerFactory emfact = fact.makeEntMgrFact("onapsessionsPU", emfProperties);
435
436                         env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emfact);
437                         env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
438                 }
439
440                 /**
441                  * Loads an existing KieSession from the persistent store.
442                  * 
443                  * @param kieBaseName
444                  *            the name of the 'KieBase' instance containing this session
445                  * @param desiredSessionId
446                  *            id of the desired KieSession
447                  * @param env
448                  *            Kie Environment for the session
449                  * @param kConf
450                  *            Kie Configuration for the session
451                  * @return the persistent session, or {@code null} if it could not be
452                  *         loaded
453                  */
454                 private KieSession loadKieSession(String kieBaseName, long desiredSessionId, Environment env,
455                                 KieSessionConfiguration kConf) {
456                         try {
457                                 KieSession kieSession = kieSvcFact.getStoreServices().loadKieSession(desiredSessionId,
458                                                 policyContainer.getKieContainer().getKieBase(kieBaseName), kConf, env);
459
460                                 logger.info("LOADING Loaded session {}", desiredSessionId);
461
462                                 return kieSession;
463
464                         } catch (Exception e) {
465                                 logger.error("loadKieSession error: ", e);
466                                 return null;
467                         }
468                 }
469
470                 /**
471                  * Creates a new, persistent KieSession.
472                  * 
473                  * @param kieBaseName
474                  *            the name of the 'KieBase' instance containing this session
475                  * @param env
476                  *            Kie Environment for the session
477                  * @return a new, persistent session
478                  */
479                 private KieSession newKieSession(String kieBaseName, Environment env) {
480                         KieSession kieSession = kieSvcFact.getStoreServices()
481                                         .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
482
483                         logger.info("LOADING CREATED {}", kieSession.getIdentifier());
484
485                         return kieSession;
486                 }
487
488                 /**
489                  * Closes the data source associated with a session.
490                  * @param name  name of the session being destroyed
491                  */
492                 private void destroyKieSession(String name) {
493                         closeDataSource(name);
494                 }
495
496                 /**
497                  * Closes the data source associated with a session.
498                  * @param name  name of the session being disposed of
499                  */
500                 private void disposeKieSession(String name) {
501                         closeDataSource(name);
502                 }
503
504                 /**
505                  * Closes the data source associated with a session.
506                  * @param name  name of the session whose data source is to be closed
507                  */
508                 private void closeDataSource(String name) {
509                         PoolingDataSource ds = name2ds.remove(name);
510                         if(ds != null) {
511                                 ds.close();
512                         }
513                 }
514         }
515
516         /* ============================================================ */
517
518         /**
519          * Removes "old" Drools 'sessioninfo' records, so they aren't used to
520          * restore data to Drools sessions. This also has the useful side-effect of
521          * removing abandoned records as well.
522          */
523         private void cleanUpSessionInfo() {
524
525                 synchronized (cleanupLock) {
526
527                         if (sessInfoCleaned) {
528                                 logger.info("Clean up of sessioninfo table: already done");
529                                 return;
530                         }
531
532                         if (sessionInfoTimeoutMs < 0) {
533                                 logger.info("Clean up of sessioninfo table: no timeout specified");
534                                 return;
535                         }
536
537                         // get DB connection properties
538                         String url = persistProps.getProperty(DroolsPersistenceProperties.DB_URL);
539                         String user = persistProps.getProperty(DroolsPersistenceProperties.DB_USER);
540                         String password = persistProps.getProperty(DroolsPersistenceProperties.DB_PWD);
541
542                         if (url == null || user == null || password == null) {
543                                 logger.error("Missing DB properties for clean up of sessioninfo table");
544                                 return;
545                         }
546
547                         // now do the record deletion
548                         try (Connection connection = fact.makeDbConnection(url, user, password);
549                                         PreparedStatement statement = connection.prepareStatement(
550                                                         "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
551                                 statement.setLong(1, sessionInfoTimeoutMs/1000);
552
553                                 int count = statement.executeUpdate();
554                                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
555
556                         } catch (SQLException e) {
557                                 logger.error("Clean up of sessioninfo table failed", e);
558                         }
559
560                         // TODO: delete DroolsSessionEntity where sessionId not in
561                         // (sessinfo.xxx)
562
563                         sessInfoCleaned = true;
564                 }
565         }
566
567         /**
568          * Gets a connector for manipulating DroolsSession objects within the
569          * persistent store.
570          * 
571          * @param pu
572          * @return a connector for DroolsSession objects
573          */
574         private DroolsSessionConnector getDroolsSessionConnector(String pu) {
575
576                 Properties propMap = new Properties();
577                 addOptProp(propMap, "javax.persistence.jdbc.driver",
578                                 persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
579                 addOptProp(propMap, "javax.persistence.jdbc.url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
580                 addOptProp(propMap, "javax.persistence.jdbc.user",
581                                 persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
582                 addOptProp(propMap, "javax.persistence.jdbc.password",
583                                 persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
584
585                 return fact.makeJpaConnector(pu, propMap);
586         }
587
588         /**
589          * Adds an optional property to a set of properties.
590          * @param propMap       map into which the property should be added
591          * @param name          property name
592          * @param value         property value, or {@code null} if it should not
593          *                                      be added
594          */
595         private void addOptProp(Properties propMap, String name, String value) {
596                 if (value != null) {
597                         propMap.put(name, value);
598                 }
599         }
600
601         /**
602          * Gets a session's ID from the persistent store.
603          * 
604          * @param conn
605          *            persistence connector
606          * @param sessnm
607          *            name of the session
608          * @return the session's id, or {@code -1} if the session is not found
609          */
610         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
611                 DroolsSession sess = conn.get(sessnm);
612                 return (sess != null ? sess.getSessionId() : -1);
613         }
614
615         /**
616          * Replaces a session within the persistent store, if it exists.  Adds
617          * it otherwise.
618          * 
619          * @param conn
620          *            persistence connector
621          * @param sessnm
622          *            name of session to be updated
623          * @param kieSession
624          *            new session information
625          */
626         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
627
628                 DroolsSessionEntity sess = new DroolsSessionEntity();
629
630                 sess.setSessionName(sessnm);
631                 sess.setSessionId(kieSession.getIdentifier());
632
633                 conn.replace(sess);
634         }
635
636         /**
637          * Determine whether persistence is enabled for a specific container
638          * 
639          * @param container
640          *            container to be checked
641          * @param sessionName
642          *            name of the session to be checked
643          * @return {@code true} if persistence is enabled for this container, and
644          *         {@code false} if not
645          */
646         private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
647                 Properties properties = getProperties(container);
648                 boolean rval = false;
649
650                 if (properties != null) {
651                         // fetch the 'type' property
652                         String type = getProperty(properties, sessionName, "type");
653                         rval = ("auto".equals(type) || "native".equals(type));
654                 }
655
656                 return (rval);
657         }
658
659         /**
660          * Determine the controller properties associated with the policy container.
661          * 
662          * @param container
663          *            container whose properties are to be retrieved
664          * @return the container's properties, or {@code null} if not found
665          */
666         private Properties getProperties(PolicyContainer container) {
667                 try {
668                         return (fact.getPolicyContainer(container).getProperties());
669                 } catch (IllegalArgumentException e) {
670                         logger.error("getProperties exception: ", e);
671                         return (null);
672                 }
673         }
674
675         /**
676          * Fetch the persistence property associated with a session. The name may
677          * have the form:
678          * <ul>
679          * <li>persistence.SESSION-NAME.PROPERTY</li>
680          * <li>persistence.PROPERTY</li>
681          * </ul>
682          * 
683          * @param properties
684          *            properties from which the value is to be retrieved
685          * @param sessionName
686          *            session name of interest
687          * @param property
688          *            property name of interest
689          * @return the property value, or {@code null} if not found
690          */
691         private String getProperty(Properties properties, String sessionName, String property) {
692                 String value = properties.getProperty("persistence." + sessionName + "." + property);
693                 if (value == null) {
694                         value = properties.getProperty("persistence." + property);
695                 }
696
697                 return (value);
698         }
699
700         /* ============================================================ */
701
702         /**
703          * This 'ThreadModel' variant periodically calls
704          * 'KieSession.fireAllRules()', because the 'fireUntilHalt' method isn't
705          * compatible with persistence.
706          */
707         public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
708                 
709                 /**
710                  * Session associated with this persistent thread.
711                  */
712                 private final PolicySession session;
713
714                 /**
715                  * The session thread.
716                  */
717                 private final Thread thread;
718                 
719                 /**
720                  * Used to indicate that processing should stop.
721                  */
722                 private final CountDownLatch stopped = new CountDownLatch(1);
723                 
724                 /**
725                  * Minimum time, in milli-seconds, that the thread should sleep
726                  * before firing rules again.
727                  */
728                 long minSleepTime = 100;
729                 
730                 /**
731                  * Maximum time, in milli-seconds, that the thread should sleep
732                  * before firing rules again.  This is a "half" time, so that
733                  * we can multiply it by two without overflowing the word size.
734                  */
735                 long halfMaxSleepTime = 5000L / 2L;
736
737                 /**
738                  * Constructor - initialize variables and create thread
739                  *
740                  * @param session
741                  *            the 'PolicySession' instance
742                  * @param properties
743                  *            may contain additional session properties
744                  */
745                 public PersistentThreadModel(PolicySession session, Properties properties) {
746                         this.session = session;
747                         this.thread = new Thread(this, getThreadName());
748                         
749                         if (properties == null) {
750                                 return;
751                         }
752                         
753                         // extract 'minSleepTime' and/or 'maxSleepTime'
754                         String name = session.getName();
755
756                         // fetch 'minSleepTime' value, and update if defined
757                         String sleepTimeString = getProperty(properties, name, "minSleepTime");
758                         if (sleepTimeString != null) {
759                                 try {
760                                         minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
761                                 } catch (Exception e) {
762                                         logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
763                                 }
764                         }
765
766                         // fetch 'maxSleepTime' value, and update if defined
767                         long maxSleepTime = 2 * halfMaxSleepTime;
768                         sleepTimeString = getProperty(properties, name, "maxSleepTime");
769                         if (sleepTimeString != null) {
770                                 try {
771                                         maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
772                                 } catch (Exception e) {
773                                         logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
774                                 }
775                         }
776
777                         // swap values if needed
778                         if (minSleepTime > maxSleepTime) {
779                                 logger.error("minSleepTime(" + minSleepTime + ") is greater than maxSleepTime(" + maxSleepTime
780                                                 + ") -- swapping");
781                                 long tmp = minSleepTime;
782                                 minSleepTime = maxSleepTime;
783                                 maxSleepTime = tmp;
784                         }
785                         
786                         halfMaxSleepTime = Math.max(1, maxSleepTime/2);
787                 }
788
789                 /**
790                  * @return the String to use as the thread name
791                  */
792                 private String getThreadName() {
793                         return ("Session " + session.getFullName() + " (persistent)");
794                 }
795
796                 /***************************/
797                 /* 'ThreadModel' interface */
798                 /***************************/
799
800                 /**
801                  * {@inheritDoc}
802                  */
803                 @Override
804                 public void start() {
805                         thread.start();
806                 }
807
808                 /**
809                  * {@inheritDoc}
810                  */
811                 @Override
812                 public void stop() {
813                         // tell the thread to stop
814                         stopped.countDown();
815                         
816                         // wait up to 10 seconds for the thread to stop
817                         try {
818                                 thread.join(10000);
819                                 
820                         } catch (InterruptedException e) {
821                                 logger.error("stopThread exception: ", e);
822                                 Thread.currentThread().interrupt();
823                         }
824                         
825                         // verify that it's done
826                         if(thread.isAlive()) {
827                                 logger.error("stopThread: still running");
828                         }
829                 }
830
831                 /**
832                  * {@inheritDoc}
833                  */
834                 @Override
835                 public void updated() {
836                         // the container artifact has been updated -- adjust the thread name
837                         thread.setName(getThreadName());
838                 }
839
840                 /************************/
841                 /* 'Runnable' interface */
842                 /************************/
843
844                 /**
845                  * {@inheritDoc}
846                  */
847                 @Override
848                 public void run() {
849                         logger.info("PersistentThreadModel running");
850                         
851                         // set thread local variable
852                         session.setPolicySession();
853
854                         KieSession kieSession = session.getKieSession();
855                         long sleepTime = 2 * halfMaxSleepTime;
856
857                         // We want to continue, despite any exceptions that occur
858                         // while rules are fired.
859                         
860                         for(;;) {
861                         
862                                 try {
863                                         if (kieSession.fireAllRules() > 0) {
864                                                 // some rules fired -- reduce poll delay
865                                                 sleepTime = Math.max(minSleepTime, sleepTime/2);
866                                         } else {
867                                                 // no rules fired -- increase poll delay
868                                                 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
869                                         }
870                                 } catch (Exception | LinkageError e) {
871                                         logger.error("Exception during kieSession.fireAllRules", e);
872                                 }                               
873                         
874                                 try {
875                                         if(stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
876                                                 break;
877                                         }
878                                         
879                                 } catch (InterruptedException e) {
880                                         logger.error("startThread exception: ", e);
881                                         Thread.currentThread().interrupt();
882                                         break;
883                                 }
884                         }
885                         
886                         logger.info("PersistentThreadModel completed");
887                 }
888         }
889
890         /* ============================================================ */
891
892         /**
893          * Factory for various items. Methods can be overridden for junit testing.
894          */
895         protected static class Factory {
896
897                 /**
898                  * Gets the configuration for the transaction manager.
899                  * 
900                  * @return the configuration for the transaction manager
901                  */
902                 public Configuration getTransMgrConfig() {
903                         return TransactionManagerServices.getConfiguration();
904                 }
905
906                 /**
907                  * Gets the transaction manager.
908                  * 
909                  * @return the transaction manager
910                  */
911                 public BitronixTransactionManager getTransMgr() {
912                         return TransactionManagerServices.getTransactionManager();
913                 }
914
915                 /**
916                  * Gets the KIE services.
917                  * 
918                  * @return the KIE services
919                  */
920                 public KieServices getKieServices() {
921                         return KieServices.Factory.get();
922                 }
923
924                 /**
925                  * Gets the current host name.
926                  * 
927                  * @return the current host name, associated with the IP address of the
928                  *         local machine
929                  * @throws UnknownHostException
930                  */
931                 public String getHostName() throws UnknownHostException {
932                         return InetAddress.getLocalHost().getHostName();
933                 }
934
935                 /**
936                  * Loads properties from a file.
937                  * 
938                  * @param filenm
939                  *            name of the file to load
940                  * @return properties, as loaded from the file
941                  * @throws IOException
942                  *             if an error occurs reading from the file
943                  */
944                 public Properties loadProperties(String filenm) throws IOException {
945                         return PropertyUtil.getProperties(filenm);
946                 }
947
948                 /**
949                  * Makes a connection to the DB.
950                  * 
951                  * @param url
952                  *            DB URL
953                  * @param user
954                  *            user name
955                  * @param pass
956                  *            password
957                  * @return a new DB connection
958                  * @throws SQLException
959                  */
960                 public Connection makeDbConnection(String url, String user, String pass) throws SQLException {
961
962                         return DriverManager.getConnection(url, user, pass);
963                 }
964
965                 /**
966                  * Makes a new pooling data source.
967                  * 
968                  * @return a new pooling data source
969                  */
970                 public PoolingDataSource makePoolingDataSource() {
971                         return new PoolingDataSource();
972                 }
973
974                 /**
975                  * Makes a new JPA connector for drools sessions.
976                  * 
977                  * @param pu
978                  *            PU for the entity manager factory
979                  * @param propMap
980                  *            properties with which the factory should be configured
981                  * @return a new JPA connector for drools sessions
982                  */
983                 public DroolsSessionConnector makeJpaConnector(String pu, Properties propMap) {
984
985                         EntityManagerFactory emf = makeEntMgrFact(pu, propMap);
986
987                         return new JpaDroolsSessionConnector(emf);
988                 }
989
990                 /**
991                  * Makes a new entity manager factory.
992                  * 
993                  * @param pu
994                  *            PU for the entity manager factory
995                  * @param propMap
996                  *            properties with which the factory should be configured
997                  * @return a new entity manager factory
998                  */
999                 public EntityManagerFactory makeEntMgrFact(String pu, Properties propMap) {
1000                         return Persistence.createEntityManagerFactory(pu, propMap);
1001                 }
1002
1003                 /**
1004                  * Gets the policy controller associated with a given policy container.
1005                  * 
1006                  * @param container
1007                  *            container whose controller is to be retrieved
1008                  * @return the container's controller
1009                  */
1010                 public PolicyController getPolicyContainer(PolicyContainer container) {
1011                         return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
1012                 }
1013         }
1014 }