332e2984fd52322ac7a1ad68ef00f34cdf75ccb4
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * If this feature is supported, there is a single instance of it. It adds persistence to Drools
58  * sessions. In addition, if an active-standby feature exists, then that is used to determine the
59  * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
60  * id.
61  *
62  * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
63  * was moved here as part of making this a separate optional feature.
64  */
65 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
66
67     private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
68
69     /** Standard factory used to get various items. */
70     private static Factory stdFactory = new Factory();
71
72     /** Factory used to get various items. */
73     private Factory fact = stdFactory;
74
75     /** KieService factory. */
76     private KieServices kieSvcFact;
77
78     /** Persistence properties. */
79     private Properties persistProps;
80
81     /** Whether or not the SessionInfo records should be cleaned out. */
82     private boolean sessInfoCleaned;
83
84     /** SessionInfo timeout, in milli-seconds, as read from 
85      * {@link #persistProps}. */
86     private long sessionInfoTimeoutMs;
87
88     /** Object used to serialize cleanup of sessioninfo table. */
89     private Object cleanupLock = new Object();
90
91     /**
92      * Sets the factory to be used during junit testing.
93      *
94      * @param fact factory to be used
95      */
96     protected void setFactory(Factory fact) {
97         this.fact = fact;
98     }
99
100     /**
101      * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
102      * not found, create one.
103      *
104      * @param policyContainer the container whose adjunct we are looking up, and possibly creating
105      * @return the associated 'ContainerAdjunct' instance, which may be new
106      */
107     private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
108
109         Object rval = policyContainer.getAdjunct(this);
110
111         if (rval == null || !(rval instanceof ContainerAdjunct)) {
112             // adjunct does not exist, or has the wrong type (should never
113             // happen)
114             rval = new ContainerAdjunct(policyContainer);
115             policyContainer.setAdjunct(this, rval);
116         }
117
118         return (ContainerAdjunct) rval;
119     }
120
121     /** 
122      * {@inheritDoc}.
123      **/
124     @Override
125     public int getSequenceNumber() {
126         return 1;
127     }
128
129     /** 
130      * {@inheritDoc}.
131      **/
132     @Override
133     public void globalInit(String[] args, String configDir) {
134
135         kieSvcFact = fact.getKieServices();
136
137         try {
138             persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
139
140         } catch (IOException e1) {
141             logger.error("initializePersistence: ", e1);
142         }
143
144         sessionInfoTimeoutMs = getPersistenceTimeout();
145     }
146
147     /**
148      * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
149      * does not exist yet.
150      */
151     @Override
152     public KieSession activatePolicySession(
153             PolicyContainer policyContainer, String name, String kieBaseName) {
154
155         if (isPersistenceEnabled(policyContainer, name)) {
156             cleanUpSessionInfo();
157
158             return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
159         }
160
161         return null;
162     }
163
164     /** 
165      * {@inheritDoc}.
166      **/
167     @Override
168     public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
169
170         PolicyContainer policyContainer = session.getPolicyContainer();
171         if (isPersistenceEnabled(policyContainer, session.getName())) {
172             return new PersistentThreadModel(session, getProperties(policyContainer));
173         }
174         return null;
175     }
176
177     /** 
178      * {@inheritDoc}.
179      **/
180     @Override
181     public void disposeKieSession(PolicySession policySession) {
182
183         ContainerAdjunct contAdj =
184                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
185         if (contAdj != null) {
186             contAdj.disposeKieSession(policySession.getName());
187         }
188     }
189
190     /** 
191      * {@inheritDoc}.
192      **/
193     @Override
194     public void destroyKieSession(PolicySession policySession) {
195
196         ContainerAdjunct contAdj =
197                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
198         if (contAdj != null) {
199             contAdj.destroyKieSession(policySession.getName());
200         }
201     }
202
203     /** 
204      * {@inheritDoc}.
205      **/
206     @Override
207     public boolean afterStart(PolicyEngine engine) {
208         return false;
209     }
210
211     /** 
212      * {@inheritDoc}.
213      **/
214     @Override
215     public boolean beforeStart(PolicyEngine engine) {
216         synchronized (cleanupLock) {
217             sessInfoCleaned = false;
218         }
219
220         return false;
221     }
222
223     /** 
224      * {@inheritDoc}.
225      **/
226     @Override
227     public boolean beforeActivate(PolicyEngine engine) {
228         synchronized (cleanupLock) {
229             sessInfoCleaned = false;
230         }
231
232         return false;
233     }
234
235     /** 
236      * {@inheritDoc}.
237      **/
238     @Override
239     public boolean afterActivate(PolicyEngine engine) {
240         return false;
241     }
242
243     /* ============================================================ */
244
245     /**
246      * Gets the persistence timeout value for sessioninfo records.
247      *
248      * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
249      */
250     private long getPersistenceTimeout() {
251         String timeoutString = null;
252
253         try {
254             timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
255
256             if (timeoutString != null) {
257                 // timeout parameter is specified
258                 return Long.valueOf(timeoutString) * 1000;
259             }
260
261         } catch (NumberFormatException e) {
262             logger.error(
263                     "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
264                     timeoutString,
265                     e);
266         }
267
268         return -1;
269     }
270
271     /* ============================================================ */
272
273     /**
274      * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
275      * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
276      * garbage-collected with the container.
277      */
278     protected class ContainerAdjunct {
279         /** 'PolicyContainer' instance that this adjunct is extending. */
280         private PolicyContainer policyContainer;
281
282         /** Maps a KIE session name to its data source. */
283         private Map<String, DsEmf> name2ds = new HashMap<>();
284
285         /**
286          * Constructor - initialize a new 'ContainerAdjunct'.
287          *
288          * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
289          */
290         private ContainerAdjunct(PolicyContainer policyContainer) {
291             this.policyContainer = policyContainer;
292         }
293
294         /**
295          * Create a new persistent KieSession. If there is already a corresponding entry in the
296          * database, it is used to initialize the KieSession. If not, a completely new session is
297          * created.
298          *
299          * @param name the name of the KieSession (which is also the name of the associated
300          *     PolicySession)
301          * @param kieBaseName the name of the 'KieBase' instance containing this session
302          * @return a new KieSession with persistence enabled
303          */
304         private KieSession newPersistentKieSession(String name, String kieBaseName) {
305
306             configureSysProps();
307
308             BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
309             DsEmf dsemf = new DsEmf(ds);
310
311             try {
312                 EntityManagerFactory emf = dsemf.emf;
313                 DroolsSessionConnector conn = fact.makeJpaConnector(emf);
314
315                 long desiredSessionId = getSessionId(conn, name);
316
317                 logger.info(
318                         "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
319
320                 // session does not exist -- attempt to create one
321                 logger.info(
322                         "getPolicySession:session does not exist -- attempt to create one with name {}", name);
323
324                 Environment env = kieSvcFact.newEnvironment();
325
326                 configureKieEnv(env, emf);
327
328                 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
329
330                 KieSession kieSession =
331                         (desiredSessionId >= 0
332                         ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
333                                 : null);
334
335                 if (kieSession == null) {
336                     // loadKieSession() returned null or desiredSessionId < 0
337                     logger.info(
338                             "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
339
340                     kieSession = newKieSession(kieBaseName, env);
341                 }
342
343                 replaceSession(conn, name, kieSession);
344
345                 name2ds.put(name, dsemf);
346
347                 return kieSession;
348
349             } catch (RuntimeException e) {
350                 dsemf.close();
351                 throw e;
352             }
353         }
354
355         /**
356          * Loads an existing KieSession from the persistent store.
357          *
358          * @param kieBaseName the name of the 'KieBase' instance containing this session
359          * @param desiredSessionId id of the desired KieSession
360          * @param env Kie Environment for the session
361          * @param kConf Kie Configuration for the session
362          * @return the persistent session, or {@code null} if it could not be loaded
363          */
364         private KieSession loadKieSession(
365                 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
366             try {
367                 KieSession kieSession =
368                         kieSvcFact
369                         .getStoreServices()
370                         .loadKieSession(
371                                 desiredSessionId,
372                                 policyContainer.getKieContainer().getKieBase(kieBaseName),
373                                 kieConf,
374                                 env);
375
376                 logger.info("LOADING Loaded session {}", desiredSessionId);
377
378                 return kieSession;
379
380             } catch (Exception e) {
381                 logger.error("loadKieSession error: ", e);
382                 return null;
383             }
384         }
385
386         /**
387          * Creates a new, persistent KieSession.
388          *
389          * @param kieBaseName the name of the 'KieBase' instance containing this session
390          * @param env Kie Environment for the session
391          * @return a new, persistent session
392          */
393         private KieSession newKieSession(String kieBaseName, Environment env) {
394             KieSession kieSession =
395                     kieSvcFact
396                     .getStoreServices()
397                     .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
398
399             logger.info("LOADING CREATED {}", kieSession.getIdentifier());
400
401             return kieSession;
402         }
403
404         /**
405          * Closes the data source associated with a session.
406          *
407          * @param name name of the session being destroyed
408          */
409         private void destroyKieSession(String name) {
410             closeDataSource(name);
411         }
412
413         /**
414          * Closes the data source associated with a session.
415          *
416          * @param name name of the session being disposed of
417          */
418         private void disposeKieSession(String name) {
419             closeDataSource(name);
420         }
421
422         /**
423          * Closes the data source associated with a session.
424          *
425          * @param name name of the session whose data source is to be closed
426          */
427         private void closeDataSource(String name) {
428             DsEmf ds = name2ds.remove(name);
429             if (ds != null) {
430                 ds.close();
431             }
432         }
433
434         /** Configures java system properties for JPA/JTA. */
435         private void configureSysProps() {
436             System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
437             System.setProperty(
438                     "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
439                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
440             System.setProperty(
441                     "ObjectStoreEnvironmentBean.objectStoreDir",
442                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
443         }
444
445         /**
446          * Configures a Kie Environment.
447          *
448          * @param env environment to be configured
449          * @param emf entity manager factory
450          */
451         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
452             env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
453             env.set(EnvironmentName.TRANSACTION, fact.getUserTrans());
454             env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg());
455             env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
456         }
457
458         /**
459          * Gets a session's ID from the persistent store.
460          *
461          * @param conn persistence connector
462          * @param sessnm name of the session
463          * @return the session's id, or {@code -1} if the session is not found
464          */
465         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
466             DroolsSession sess = conn.get(sessnm);
467             return sess != null ? sess.getSessionId() : -1;
468         }
469
470         /**
471          * Replaces a session within the persistent store, if it exists. Adds it otherwise.
472          *
473          * @param conn persistence connector
474          * @param sessnm name of session to be updated
475          * @param kieSession new session information
476          */
477         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
478
479             DroolsSessionEntity sess = new DroolsSessionEntity();
480
481             sess.setSessionName(sessnm);
482             sess.setSessionId(kieSession.getIdentifier());
483
484             conn.replace(sess);
485         }
486     }
487
488     /* ============================================================ */
489
490     /**
491      * Gets the data source properties.
492      *
493      * @return the data source properties
494      */
495     private Properties getDataSourceProperties() {
496         Properties props = new Properties();
497         props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
498         props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
499         props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
500         props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
501         props.put("maxActive", "3");
502         props.put("maxIdle", "1");
503         props.put("maxWait", "120000");
504         props.put("whenExhaustedAction", "2");
505         props.put("testOnBorrow", "false");
506         props.put("poolPreparedStatements", "true");
507
508         return props;
509     }
510
511     /**
512      * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
513      * sessions. This also has the useful side-effect of removing abandoned records as well.
514      */
515     private void cleanUpSessionInfo() {
516
517         synchronized (cleanupLock) {
518             if (sessInfoCleaned) {
519                 logger.info("Clean up of sessioninfo table: already done");
520                 return;
521             }
522
523             if (sessionInfoTimeoutMs < 0) {
524                 logger.info("Clean up of sessioninfo table: no timeout specified");
525                 return;
526             }
527
528             // now do the record deletion
529             try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
530                     Connection connection = ds.getConnection();
531                     PreparedStatement statement =
532                             connection.prepareStatement(
533                                 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
534
535                 connection.setAutoCommit(true);
536
537                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
538
539                 int count = statement.executeUpdate();
540                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
541
542             } catch (SQLException e) {
543                 logger.error("Clean up of sessioninfo table failed", e);
544             }
545
546             // TODO: delete DroolsSessionEntity where sessionId not in
547             // (sessinfo.xxx)
548
549             sessInfoCleaned = true;
550         }
551     }
552
553     /**
554      * Determine whether persistence is enabled for a specific container.
555      *
556      * @param container container to be checked
557      * @param sessionName name of the session to be checked
558      * @return {@code true} if persistence is enabled for this container, and {@code false} if not
559      */
560     private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
561         Properties properties = getProperties(container);
562         boolean rval = false;
563
564         if (properties != null) {
565             // fetch the 'type' property
566             String type = getProperty(properties, sessionName, "type");
567             rval = "auto".equals(type) || "native".equals(type);
568         }
569
570         return rval;
571     }
572
573     /**
574      * Determine the controller properties associated with the policy container.
575      *
576      * @param container container whose properties are to be retrieved
577      * @return the container's properties, or {@code null} if not found
578      */
579     private Properties getProperties(PolicyContainer container) {
580         try {
581             return fact.getPolicyController(container).getProperties();
582         } catch (IllegalArgumentException e) {
583             logger.error("getProperties exception: ", e);
584             return null;
585         }
586     }
587
588     /**
589      * Fetch the persistence property associated with a session. The name may have the form:
590      *
591      * <ul>
592      *   <li>persistence.SESSION-NAME.PROPERTY
593      *   <li>persistence.PROPERTY
594      * </ul>
595      *
596      * @param properties properties from which the value is to be retrieved
597      * @param sessionName session name of interest
598      * @param property property name of interest
599      * @return the property value, or {@code null} if not found
600      */
601     private String getProperty(Properties properties, String sessionName, String property) {
602         String value = properties.getProperty("persistence." + sessionName + "." + property);
603         if (value == null) {
604             value = properties.getProperty("persistence." + property);
605         }
606
607         return value;
608     }
609
610     /* ============================================================ */
611
612     /**
613      * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
614      * 'fireUntilHalt' method isn't compatible with persistence.
615      */
616     public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
617
618         /** Session associated with this persistent thread. */
619         private final PolicySession session;
620
621         /** The session thread. */
622         private final Thread thread;
623
624         /** Used to indicate that processing should stop. */
625         private final CountDownLatch stopped = new CountDownLatch(1);
626
627         /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
628         long minSleepTime = 100;
629
630         /**
631          * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
632          * is a "half" time, so that we can multiply it by two without overflowing the word size.
633          */
634         long halfMaxSleepTime = 5000L / 2L;
635
636         /**
637          * Constructor - initialize variables and create thread.
638          *
639          * @param session the 'PolicySession' instance
640          * @param properties may contain additional session properties
641          */
642         public PersistentThreadModel(PolicySession session, Properties properties) {
643             this.session = session;
644             this.thread = new Thread(this, getThreadName());
645
646             if (properties == null) {
647                 return;
648             }
649
650             // extract 'minSleepTime' and/or 'maxSleepTime'
651             String name = session.getName();
652
653             // fetch 'minSleepTime' value, and update if defined
654             String sleepTimeString = getProperty(properties, name, "minSleepTime");
655             if (sleepTimeString != null) {
656                 try {
657                     minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
658                 } catch (Exception e) {
659                     logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
660                 }
661             }
662
663             // fetch 'maxSleepTime' value, and update if defined
664             long maxSleepTime = 2 * halfMaxSleepTime;
665             sleepTimeString = getProperty(properties, name, "maxSleepTime");
666             if (sleepTimeString != null) {
667                 try {
668                     maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
669                 } catch (Exception e) {
670                     logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
671                 }
672             }
673
674             // swap values if needed
675             if (minSleepTime > maxSleepTime) {
676                 logger.error(
677                         "minSleepTime("
678                                 + minSleepTime
679                                 + ") is greater than maxSleepTime("
680                                 + maxSleepTime
681                                 + ") -- swapping");
682                 long tmp = minSleepTime;
683                 minSleepTime = maxSleepTime;
684                 maxSleepTime = tmp;
685             }
686
687             halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
688         }
689
690         /**
691          * Get thread name.
692          *  
693          * @return the String to use as the thread name */
694         private String getThreadName() {
695             return "Session " + session.getFullName() + " (persistent)";
696         }
697
698         /*=========================*/
699         /* 'ThreadModel' interface */
700         /*=========================*/
701
702         /** 
703          * {@inheritDoc}.
704          **/
705         @Override
706         public void start() {
707             thread.start();
708         }
709
710         /** 
711          * {@inheritDoc}.
712          **/
713         @Override
714         public void stop() {
715             // tell the thread to stop
716             stopped.countDown();
717
718             // wait up to 10 seconds for the thread to stop
719             try {
720                 thread.join(10000);
721
722             } catch (InterruptedException e) {
723                 logger.error("stopThread exception: ", e);
724                 Thread.currentThread().interrupt();
725             }
726
727             // verify that it's done
728             if (thread.isAlive()) {
729                 logger.error("stopThread: still running");
730             }
731         }
732
733         /** 
734          * {@inheritDoc}.
735          **/
736         @Override
737         public void updated() {
738             // the container artifact has been updated -- adjust the thread name
739             thread.setName(getThreadName());
740         }
741
742         /*======================*/
743         /* 'Runnable' interface */
744         /*======================*/
745
746         /** 
747          * {@inheritDoc}.
748          **/
749         @Override
750         public void run() {
751             logger.info("PersistentThreadModel running");
752
753             // set thread local variable
754             session.setPolicySession();
755
756             KieSession kieSession = session.getKieSession();
757             long sleepTime = 2 * halfMaxSleepTime;
758
759             // We want to continue, despite any exceptions that occur
760             // while rules are fired.
761
762             boolean cont = true;
763             while (cont) {
764
765                 try {
766                     if (kieSession.fireAllRules() > 0) {
767                         // some rules fired -- reduce poll delay
768                         sleepTime = Math.max(minSleepTime, sleepTime / 2);
769                     } else {
770                         // no rules fired -- increase poll delay
771                         sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
772                     }
773
774                 } catch (Exception | LinkageError e) {
775                     logger.error("Exception during kieSession.fireAllRules", e);
776                 }
777
778                 try {
779                     if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
780                         cont = false;
781                     }
782
783                 } catch (InterruptedException e) {
784                     logger.error("startThread exception: ", e);
785                     Thread.currentThread().interrupt();
786                     cont = false;
787                 }
788             }
789
790             logger.info("PersistentThreadModel completed");
791         }
792     }
793
794     /* ============================================================ */
795
796     /** DataSource-EntityManagerFactory pair. */
797     private class DsEmf {
798         private BasicDataSource bds;
799         private EntityManagerFactory emf;
800
801         /**
802          * Makes an entity manager factory for the given data source.
803          *
804          * @param bds pooled data source
805          */
806         public DsEmf(BasicDataSource bds) {
807             try {
808                 Map<String, Object> props = new HashMap<>();
809                 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
810
811                 this.bds = bds;
812                 this.emf = fact.makeEntMgrFact(props);
813
814             } catch (RuntimeException e) {
815                 closeDataSource();
816                 throw e;
817             }
818         }
819
820         /** Closes the entity manager factory and the data source. */
821         public void close() {
822             try {
823                 emf.close();
824
825             } catch (RuntimeException e) {
826                 closeDataSource();
827                 throw e;
828             }
829
830             closeDataSource();
831         }
832
833         /** Closes the data source only. */
834         private void closeDataSource() {
835             try {
836                 bds.close();
837
838             } catch (SQLException e) {
839                 throw new PersistenceFeatureException(e);
840             }
841         }
842     }
843
844     private static class SingletonRegistry {
845         private static final TransactionSynchronizationRegistry transreg =
846                 new com.arjuna.ats.internal.jta.transaction.arjunacore
847                 .TransactionSynchronizationRegistryImple();
848
849         private SingletonRegistry() {
850             super();
851         }
852     }
853
854     /** Factory for various items. Methods can be overridden for junit testing. */
855     protected static class Factory {
856
857         /**
858          * Gets the transaction manager.
859          *
860          * @return the transaction manager
861          */
862         public TransactionManager getTransMgr() {
863             return com.arjuna.ats.jta.TransactionManager.transactionManager();
864         }
865
866         /**
867          * Gets the user transaction.
868          *
869          * @return the user transaction
870          */
871         public UserTransaction getUserTrans() {
872             return com.arjuna.ats.jta.UserTransaction.userTransaction();
873         }
874
875         /**
876          * Gets the transaction synchronization registry.
877          *
878          * @return the transaction synchronization registry
879          */
880         public TransactionSynchronizationRegistry getTransSyncReg() {
881             return SingletonRegistry.transreg;
882         }
883
884         /**
885          * Gets the KIE services.
886          *
887          * @return the KIE services
888          */
889         public KieServices getKieServices() {
890             return KieServices.Factory.get();
891         }
892
893         /**
894          * Loads properties from a file.
895          *
896          * @param filenm name of the file to load
897          * @return properties, as loaded from the file
898          * @throws IOException if an error occurs reading from the file
899          */
900         public Properties loadProperties(String filenm) throws IOException {
901             return PropertyUtil.getProperties(filenm);
902         }
903
904         /**
905          * Makes a Data Source.
906          *
907          * @param dsProps data source properties
908          * @return a new data source
909          */
910         public BasicDataSource makeDataSource(Properties dsProps) {
911             try {
912                 return BasicDataSourceFactory.createDataSource(dsProps);
913
914             } catch (Exception e) {
915                 throw new PersistenceFeatureException(e);
916             }
917         }
918
919         /**
920          * Makes a new JPA connector for drools sessions.
921          *
922          * @param emf entity manager factory
923          * @return a new JPA connector for drools sessions
924          */
925         public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
926             return new JpaDroolsSessionConnector(emf);
927         }
928
929         /**
930          * Makes a new entity manager factory.
931          *
932          * @param props properties with which the factory should be configured
933          * @return a new entity manager factory
934          */
935         public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
936             return Persistence.createEntityManagerFactory("onapsessionsPU", props);
937         }
938
939         /**
940          * Gets the policy controller associated with a given policy container.
941          *
942          * @param container container whose controller is to be retrieved
943          * @return the container's controller
944          */
945         public PolicyController getPolicyController(PolicyContainer container) {
946             return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
947         }
948     }
949
950     /**
951      * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
952      * particularly when they are not, themselves, Runtime exceptions.
953      */
954     public static class PersistenceFeatureException extends RuntimeException {
955         private static final long serialVersionUID = 1L;
956
957         /** 
958          * Constructor.
959          * */
960         public PersistenceFeatureException(Exception ex) {
961             super(ex);
962         }
963     }
964 }