13264d60c69886e55808dfa7ad6c1c4e1091b598
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * If this feature is supported, there is a single instance of it. It adds persistence to Drools
58  * sessions. In addition, if an active-standby feature exists, then that is used to determine the
59  * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
60  * id.
61  *
62  * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
63  * was moved here as part of making this a separate optional feature.
64  */
65 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
66
67     private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
68
69     /** Standard factory used to get various items. */
70     private static Factory stdFactory = new Factory();
71
72     /** Factory used to get various items. */
73     private Factory fact = stdFactory;
74
75     /** KieService factory. */
76     private KieServices kieSvcFact;
77
78     /** Persistence properties. */
79     private Properties persistProps;
80
81     /** Whether or not the SessionInfo records should be cleaned out. */
82     private boolean sessInfoCleaned;
83
84     /** SessionInfo timeout, in milli-seconds, as read from 
85      * {@link #persistProps}. */
86     private long sessionInfoTimeoutMs;
87
88     /** Object used to serialize cleanup of sessioninfo table. */
89     private Object cleanupLock = new Object();
90
91     /**
92      * Sets the factory to be used during junit testing.
93      *
94      * @param fact factory to be used
95      */
96     protected void setFactory(Factory fact) {
97         this.fact = fact;
98     }
99
100     /**
101      * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
102      * not found, create one.
103      *
104      * @param policyContainer the container whose adjunct we are looking up, and possibly creating
105      * @return the associated 'ContainerAdjunct' instance, which may be new
106      */
107     private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
108
109         Object rval = policyContainer.getAdjunct(this);
110
111         if (rval == null || !(rval instanceof ContainerAdjunct)) {
112             // adjunct does not exist, or has the wrong type (should never
113             // happen)
114             rval = new ContainerAdjunct(policyContainer);
115             policyContainer.setAdjunct(this, rval);
116         }
117
118         return (ContainerAdjunct) rval;
119     }
120
121     /** 
122      * {@inheritDoc}.
123      **/
124     @Override
125     public int getSequenceNumber() {
126         return 1;
127     }
128
129     /** 
130      * {@inheritDoc}.
131      **/
132     @Override
133     public void globalInit(String[] args, String configDir) {
134
135         kieSvcFact = fact.getKieServices();
136
137         try {
138             persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
139
140         } catch (IOException e1) {
141             logger.error("initializePersistence: ", e1);
142         }
143
144         sessionInfoTimeoutMs = getPersistenceTimeout();
145     }
146
147     /**
148      * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
149      * does not exist yet.
150      */
151     @Override
152     public KieSession activatePolicySession(
153             PolicyContainer policyContainer, String name, String kieBaseName) {
154
155         if (isPersistenceEnabled(policyContainer, name)) {
156             cleanUpSessionInfo();
157
158             return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
159         }
160
161         return null;
162     }
163
164     /** 
165      * {@inheritDoc}.
166      **/
167     @Override
168     public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
169
170         PolicyContainer policyContainer = session.getPolicyContainer();
171         if (isPersistenceEnabled(policyContainer, session.getName())) {
172             return new PersistentThreadModel(session, getProperties(policyContainer));
173         }
174         return null;
175     }
176
177     /** 
178      * {@inheritDoc}.
179      **/
180     @Override
181     public void disposeKieSession(PolicySession policySession) {
182
183         ContainerAdjunct contAdj =
184                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
185         if (contAdj != null) {
186             contAdj.disposeKieSession(policySession.getName());
187         }
188     }
189
190     /** 
191      * {@inheritDoc}.
192      **/
193     @Override
194     public void destroyKieSession(PolicySession policySession) {
195
196         ContainerAdjunct contAdj =
197                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
198         if (contAdj != null) {
199             contAdj.destroyKieSession(policySession.getName());
200         }
201     }
202
203     /** 
204      * {@inheritDoc}.
205      **/
206     @Override
207     public boolean afterStart(PolicyEngine engine) {
208         return false;
209     }
210
211     /** 
212      * {@inheritDoc}.
213      **/
214     @Override
215     public boolean beforeStart(PolicyEngine engine) {
216         synchronized (cleanupLock) {
217             sessInfoCleaned = false;
218         }
219
220         return false;
221     }
222
223     /** 
224      * {@inheritDoc}.
225      **/
226     @Override
227     public boolean beforeActivate(PolicyEngine engine) {
228         synchronized (cleanupLock) {
229             sessInfoCleaned = false;
230         }
231
232         return false;
233     }
234
235     /** 
236      * {@inheritDoc}.
237      **/
238     @Override
239     public boolean afterActivate(PolicyEngine engine) {
240         return false;
241     }
242
243     /* ============================================================ */
244
245     /**
246      * Gets the persistence timeout value for sessioninfo records.
247      *
248      * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
249      */
250     private long getPersistenceTimeout() {
251         String timeoutString = null;
252
253         try {
254             timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
255
256             if (timeoutString != null) {
257                 // timeout parameter is specified
258                 return Long.valueOf(timeoutString) * 1000;
259             }
260
261         } catch (NumberFormatException e) {
262             logger.error(
263                     "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
264                     timeoutString,
265                     e);
266         }
267
268         return -1;
269     }
270
271     /* ============================================================ */
272
273     /**
274      * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
275      * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
276      * garbage-collected with the container.
277      */
278     protected class ContainerAdjunct {
279         /** 'PolicyContainer' instance that this adjunct is extending. */
280         private PolicyContainer policyContainer;
281
282         /** Maps a KIE session name to its data source. */
283         private Map<String, DsEmf> name2ds = new HashMap<>();
284
285         /**
286          * Constructor - initialize a new 'ContainerAdjunct'.
287          *
288          * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
289          */
290         private ContainerAdjunct(PolicyContainer policyContainer) {
291             this.policyContainer = policyContainer;
292         }
293
294         /**
295          * Create a new persistent KieSession. If there is already a corresponding entry in the
296          * database, it is used to initialize the KieSession. If not, a completely new session is
297          * created.
298          *
299          * @param name the name of the KieSession (which is also the name of the associated
300          *     PolicySession)
301          * @param kieBaseName the name of the 'KieBase' instance containing this session
302          * @return a new KieSession with persistence enabled
303          */
304         private KieSession newPersistentKieSession(String name, String kieBaseName) {
305
306             configureSysProps();
307
308             BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
309             DsEmf dsemf = new DsEmf(ds);
310
311             try {
312                 EntityManagerFactory emf = dsemf.emf;
313                 DroolsSessionConnector conn = fact.makeJpaConnector(emf);
314
315                 long desiredSessionId = getSessionId(conn, name);
316
317                 logger.info(
318                         "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
319
320                 // session does not exist -- attempt to create one
321                 logger.info(
322                         "getPolicySession:session does not exist -- attempt to create one with name {}", name);
323
324                 Environment env = kieSvcFact.newEnvironment();
325
326                 configureKieEnv(env, emf);
327
328                 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
329
330                 KieSession kieSession =
331                         (desiredSessionId >= 0
332                         ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
333                                 : null);
334
335                 if (kieSession == null) {
336                     // loadKieSession() returned null or desiredSessionId < 0
337                     logger.info(
338                             "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
339
340                     kieSession = newKieSession(kieBaseName, env);
341                 }
342
343                 replaceSession(conn, name, kieSession);
344
345                 name2ds.put(name, dsemf);
346
347                 return kieSession;
348
349             } catch (RuntimeException e) {
350                 dsemf.close();
351                 throw e;
352             }
353         }
354
355         /**
356          * Loads an existing KieSession from the persistent store.
357          *
358          * @param kieBaseName the name of the 'KieBase' instance containing this session
359          * @param desiredSessionId id of the desired KieSession
360          * @param env Kie Environment for the session
361          * @param kConf Kie Configuration for the session
362          * @return the persistent session, or {@code null} if it could not be loaded
363          */
364         private KieSession loadKieSession(
365                 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
366             try {
367                 KieSession kieSession =
368                         kieSvcFact
369                         .getStoreServices()
370                         .loadKieSession(
371                                 desiredSessionId,
372                                 policyContainer.getKieContainer().getKieBase(kieBaseName),
373                                 kieConf,
374                                 env);
375
376                 logger.info("LOADING Loaded session {}", desiredSessionId);
377
378                 return kieSession;
379
380             } catch (Exception e) {
381                 logger.error("loadKieSession error: ", e);
382                 return null;
383             }
384         }
385
386         /**
387          * Creates a new, persistent KieSession.
388          *
389          * @param kieBaseName the name of the 'KieBase' instance containing this session
390          * @param env Kie Environment for the session
391          * @return a new, persistent session
392          */
393         private KieSession newKieSession(String kieBaseName, Environment env) {
394             KieSession kieSession =
395                     kieSvcFact
396                     .getStoreServices()
397                     .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
398
399             logger.info("LOADING CREATED {}", kieSession.getIdentifier());
400
401             return kieSession;
402         }
403
404         /**
405          * Closes the data source associated with a session.
406          *
407          * @param name name of the session being destroyed
408          */
409         private void destroyKieSession(String name) {
410             closeDataSource(name);
411         }
412
413         /**
414          * Closes the data source associated with a session.
415          *
416          * @param name name of the session being disposed of
417          */
418         private void disposeKieSession(String name) {
419             closeDataSource(name);
420         }
421
422         /**
423          * Closes the data source associated with a session.
424          *
425          * @param name name of the session whose data source is to be closed
426          */
427         private void closeDataSource(String name) {
428             DsEmf ds = name2ds.remove(name);
429             if (ds != null) {
430                 ds.close();
431             }
432         }
433
434         /** Configures java system properties for JPA/JTA. */
435         private void configureSysProps() {
436             System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
437             System.setProperty(
438                     "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
439                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
440             System.setProperty(
441                     "ObjectStoreEnvironmentBean.objectStoreDir",
442                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
443         }
444
445         /**
446          * Configures a Kie Environment.
447          *
448          * @param env environment to be configured
449          * @param emf entity manager factory
450          */
451         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
452             env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
453             env.set(EnvironmentName.TRANSACTION, fact.getUserTrans());
454             env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg());
455             env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
456         }
457
458         /**
459          * Gets a session's ID from the persistent store.
460          *
461          * @param conn persistence connector
462          * @param sessnm name of the session
463          * @return the session's id, or {@code -1} if the session is not found
464          */
465         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
466             DroolsSession sess = conn.get(sessnm);
467             return sess != null ? sess.getSessionId() : -1;
468         }
469
470         /**
471          * Replaces a session within the persistent store, if it exists. Adds it otherwise.
472          *
473          * @param conn persistence connector
474          * @param sessnm name of session to be updated
475          * @param kieSession new session information
476          */
477         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
478
479             DroolsSessionEntity sess = new DroolsSessionEntity();
480
481             sess.setSessionName(sessnm);
482             sess.setSessionId(kieSession.getIdentifier());
483
484             conn.replace(sess);
485         }
486     }
487
488     /* ============================================================ */
489
490     /**
491      * Gets the data source properties.
492      *
493      * @return the data source properties
494      */
495     private Properties getDataSourceProperties() {
496         Properties props = new Properties();
497         props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
498         props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
499         props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
500         props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
501         props.put("maxActive", "3");
502         props.put("maxIdle", "1");
503         props.put("maxWait", "120000");
504         props.put("whenExhaustedAction", "2");
505         props.put("testOnBorrow", "false");
506         props.put("poolPreparedStatements", "true");
507
508         return props;
509     }
510
511     /**
512      * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
513      * sessions. This also has the useful side-effect of removing abandoned records as well.
514      */
515     private void cleanUpSessionInfo() {
516
517         synchronized (cleanupLock) {
518             if (sessInfoCleaned) {
519                 logger.info("Clean up of sessioninfo table: already done");
520                 return;
521             }
522
523             if (sessionInfoTimeoutMs < 0) {
524                 logger.info("Clean up of sessioninfo table: no timeout specified");
525                 return;
526             }
527
528             // now do the record deletion
529             try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
530                     Connection connection = ds.getConnection();
531                     PreparedStatement statement =
532                             connection.prepareStatement(
533                                 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
534
535                 connection.setAutoCommit(true);
536
537                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
538
539                 int count = statement.executeUpdate();
540                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
541
542             } catch (SQLException e) {
543                 logger.error("Clean up of sessioninfo table failed", e);
544             }
545
546             // delete DroolsSessionEntity where sessionId not in (sessinfo.xxx)?
547
548             sessInfoCleaned = true;
549         }
550     }
551
552     /**
553      * Determine whether persistence is enabled for a specific container.
554      *
555      * @param container container to be checked
556      * @param sessionName name of the session to be checked
557      * @return {@code true} if persistence is enabled for this container, and {@code false} if not
558      */
559     private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
560         Properties properties = getProperties(container);
561         boolean rval = false;
562
563         if (properties != null) {
564             // fetch the 'type' property
565             String type = getProperty(properties, sessionName, "type");
566             rval = "auto".equals(type) || "native".equals(type);
567         }
568
569         return rval;
570     }
571
572     /**
573      * Determine the controller properties associated with the policy container.
574      *
575      * @param container container whose properties are to be retrieved
576      * @return the container's properties, or {@code null} if not found
577      */
578     private Properties getProperties(PolicyContainer container) {
579         try {
580             return fact.getPolicyController(container).getProperties();
581         } catch (IllegalArgumentException e) {
582             logger.error("getProperties exception: ", e);
583             return null;
584         }
585     }
586
587     /**
588      * Fetch the persistence property associated with a session. The name may have the form:
589      *
590      * <ul>
591      *   <li>persistence.SESSION-NAME.PROPERTY
592      *   <li>persistence.PROPERTY
593      * </ul>
594      *
595      * @param properties properties from which the value is to be retrieved
596      * @param sessionName session name of interest
597      * @param property property name of interest
598      * @return the property value, or {@code null} if not found
599      */
600     private String getProperty(Properties properties, String sessionName, String property) {
601         String value = properties.getProperty("persistence." + sessionName + "." + property);
602         if (value == null) {
603             value = properties.getProperty("persistence." + property);
604         }
605
606         return value;
607     }
608
609     /* ============================================================ */
610
611     /**
612      * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
613      * 'fireUntilHalt' method isn't compatible with persistence.
614      */
615     public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
616
617         /** Session associated with this persistent thread. */
618         private final PolicySession session;
619
620         /** The session thread. */
621         private final Thread thread;
622
623         /** Used to indicate that processing should stop. */
624         private final CountDownLatch stopped = new CountDownLatch(1);
625
626         /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
627         long minSleepTime = 100;
628
629         /**
630          * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
631          * is a "half" time, so that we can multiply it by two without overflowing the word size.
632          */
633         long halfMaxSleepTime = 5000L / 2L;
634
635         /**
636          * Constructor - initialize variables and create thread.
637          *
638          * @param session the 'PolicySession' instance
639          * @param properties may contain additional session properties
640          */
641         public PersistentThreadModel(PolicySession session, Properties properties) {
642             this.session = session;
643             this.thread = new Thread(this, getThreadName());
644
645             if (properties == null) {
646                 return;
647             }
648
649             // extract 'minSleepTime' and/or 'maxSleepTime'
650             String name = session.getName();
651
652             // fetch 'minSleepTime' value, and update if defined
653             String sleepTimeString = getProperty(properties, name, "minSleepTime");
654             if (sleepTimeString != null) {
655                 try {
656                     minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
657                 } catch (Exception e) {
658                     logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
659                 }
660             }
661
662             // fetch 'maxSleepTime' value, and update if defined
663             long maxSleepTime = 2 * halfMaxSleepTime;
664             sleepTimeString = getProperty(properties, name, "maxSleepTime");
665             if (sleepTimeString != null) {
666                 try {
667                     maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
668                 } catch (Exception e) {
669                     logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
670                 }
671             }
672
673             // swap values if needed
674             if (minSleepTime > maxSleepTime) {
675                 logger.error(
676                         "minSleepTime("
677                                 + minSleepTime
678                                 + ") is greater than maxSleepTime("
679                                 + maxSleepTime
680                                 + ") -- swapping");
681                 long tmp = minSleepTime;
682                 minSleepTime = maxSleepTime;
683                 maxSleepTime = tmp;
684             }
685
686             halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
687         }
688
689         /**
690          * Get thread name.
691          *  
692          * @return the String to use as the thread name */
693         private String getThreadName() {
694             return "Session " + session.getFullName() + " (persistent)";
695         }
696
697         /*=========================*/
698         /* 'ThreadModel' interface */
699         /*=========================*/
700
701         /** 
702          * {@inheritDoc}.
703          **/
704         @Override
705         public void start() {
706             thread.start();
707         }
708
709         /** 
710          * {@inheritDoc}.
711          **/
712         @Override
713         public void stop() {
714             // tell the thread to stop
715             stopped.countDown();
716
717             // wait up to 10 seconds for the thread to stop
718             try {
719                 thread.join(10000);
720
721             } catch (InterruptedException e) {
722                 logger.error("stopThread exception: ", e);
723                 Thread.currentThread().interrupt();
724             }
725
726             // verify that it's done
727             if (thread.isAlive()) {
728                 logger.error("stopThread: still running");
729             }
730         }
731
732         /** 
733          * {@inheritDoc}.
734          **/
735         @Override
736         public void updated() {
737             // the container artifact has been updated -- adjust the thread name
738             thread.setName(getThreadName());
739         }
740
741         /*======================*/
742         /* 'Runnable' interface */
743         /*======================*/
744
745         /** 
746          * {@inheritDoc}.
747          **/
748         @Override
749         public void run() {
750             logger.info("PersistentThreadModel running");
751
752             // set thread local variable
753             session.setPolicySession();
754
755             KieSession kieSession = session.getKieSession();
756             long sleepTime = 2 * halfMaxSleepTime;
757
758             // We want to continue, despite any exceptions that occur
759             // while rules are fired.
760
761             boolean cont = true;
762             while (cont) {
763
764                 try {
765                     if (kieSession.fireAllRules() > 0) {
766                         // some rules fired -- reduce poll delay
767                         sleepTime = Math.max(minSleepTime, sleepTime / 2);
768                     } else {
769                         // no rules fired -- increase poll delay
770                         sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
771                     }
772
773                 } catch (Exception | LinkageError e) {
774                     logger.error("Exception during kieSession.fireAllRules", e);
775                 }
776
777                 try {
778                     if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
779                         cont = false;
780                     }
781
782                 } catch (InterruptedException e) {
783                     logger.error("startThread exception: ", e);
784                     Thread.currentThread().interrupt();
785                     cont = false;
786                 }
787             }
788
789             logger.info("PersistentThreadModel completed");
790         }
791     }
792
793     /* ============================================================ */
794
795     /** DataSource-EntityManagerFactory pair. */
796     private class DsEmf {
797         private BasicDataSource bds;
798         private EntityManagerFactory emf;
799
800         /**
801          * Makes an entity manager factory for the given data source.
802          *
803          * @param bds pooled data source
804          */
805         public DsEmf(BasicDataSource bds) {
806             try {
807                 Map<String, Object> props = new HashMap<>();
808                 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
809
810                 this.bds = bds;
811                 this.emf = fact.makeEntMgrFact(props);
812
813             } catch (RuntimeException e) {
814                 closeDataSource();
815                 throw e;
816             }
817         }
818
819         /** Closes the entity manager factory and the data source. */
820         public void close() {
821             try {
822                 emf.close();
823
824             } catch (RuntimeException e) {
825                 closeDataSource();
826                 throw e;
827             }
828
829             closeDataSource();
830         }
831
832         /** Closes the data source only. */
833         private void closeDataSource() {
834             try {
835                 bds.close();
836
837             } catch (SQLException e) {
838                 throw new PersistenceFeatureException(e);
839             }
840         }
841     }
842
843     private static class SingletonRegistry {
844         private static final TransactionSynchronizationRegistry transreg =
845                 new com.arjuna.ats.internal.jta.transaction.arjunacore
846                 .TransactionSynchronizationRegistryImple();
847
848         private SingletonRegistry() {
849             super();
850         }
851     }
852
853     /** Factory for various items. Methods can be overridden for junit testing. */
854     protected static class Factory {
855
856         /**
857          * Gets the transaction manager.
858          *
859          * @return the transaction manager
860          */
861         public TransactionManager getTransMgr() {
862             return com.arjuna.ats.jta.TransactionManager.transactionManager();
863         }
864
865         /**
866          * Gets the user transaction.
867          *
868          * @return the user transaction
869          */
870         public UserTransaction getUserTrans() {
871             return com.arjuna.ats.jta.UserTransaction.userTransaction();
872         }
873
874         /**
875          * Gets the transaction synchronization registry.
876          *
877          * @return the transaction synchronization registry
878          */
879         public TransactionSynchronizationRegistry getTransSyncReg() {
880             return SingletonRegistry.transreg;
881         }
882
883         /**
884          * Gets the KIE services.
885          *
886          * @return the KIE services
887          */
888         public KieServices getKieServices() {
889             return KieServices.Factory.get();
890         }
891
892         /**
893          * Loads properties from a file.
894          *
895          * @param filenm name of the file to load
896          * @return properties, as loaded from the file
897          * @throws IOException if an error occurs reading from the file
898          */
899         public Properties loadProperties(String filenm) throws IOException {
900             return PropertyUtil.getProperties(filenm);
901         }
902
903         /**
904          * Makes a Data Source.
905          *
906          * @param dsProps data source properties
907          * @return a new data source
908          */
909         public BasicDataSource makeDataSource(Properties dsProps) {
910             try {
911                 return BasicDataSourceFactory.createDataSource(dsProps);
912
913             } catch (Exception e) {
914                 throw new PersistenceFeatureException(e);
915             }
916         }
917
918         /**
919          * Makes a new JPA connector for drools sessions.
920          *
921          * @param emf entity manager factory
922          * @return a new JPA connector for drools sessions
923          */
924         public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
925             return new JpaDroolsSessionConnector(emf);
926         }
927
928         /**
929          * Makes a new entity manager factory.
930          *
931          * @param props properties with which the factory should be configured
932          * @return a new entity manager factory
933          */
934         public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
935             return Persistence.createEntityManagerFactory("onapsessionsPU", props);
936         }
937
938         /**
939          * Gets the policy controller associated with a given policy container.
940          *
941          * @param container container whose controller is to be retrieved
942          * @return the container's controller
943          */
944         public PolicyController getPolicyController(PolicyContainer container) {
945             return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
946         }
947     }
948
949     /**
950      * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
951      * particularly when they are not, themselves, Runtime exceptions.
952      */
953     public static class PersistenceFeatureException extends RuntimeException {
954         private static final long serialVersionUID = 1L;
955
956         /** 
957          * Constructor.
958          * */
959         public PersistenceFeatureException(Exception ex) {
960             super(ex);
961         }
962     }
963 }