7b17de6c72ee05c620815b3d25f78b57cc1d208f
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * If this feature is supported, there is a single instance of it. It adds persistence to Drools
58  * sessions. In addition, if an active-standby feature exists, then that is used to determine the
59  * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
60  * id.
61  *
62  * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
63  * was moved here as part of making this a separate optional feature.
64  */
65 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
66
67     private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
68
69     /** Standard factory used to get various items. */
70     private static Factory stdFactory = new Factory();
71
72     /** Factory used to get various items. */
73     private Factory fact = stdFactory;
74
75     /** KieService factory. */
76     private KieServices kieSvcFact;
77
78     /** Persistence properties. */
79     private Properties persistProps;
80
81     /** Whether or not the SessionInfo records should be cleaned out. */
82     private boolean sessInfoCleaned;
83
84     /** SessionInfo timeout, in milli-seconds, as read from 
85      * {@link #persistProps}. */
86     private long sessionInfoTimeoutMs;
87
88     /** Object used to serialize cleanup of sessioninfo table. */
89     private Object cleanupLock = new Object();
90
91     /**
92      * Sets the factory to be used during junit testing.
93      *
94      * @param fact factory to be used
95      */
96     protected void setFactory(Factory fact) {
97         this.fact = fact;
98     }
99
100     /**
101      * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
102      * not found, create one.
103      *
104      * @param policyContainer the container whose adjunct we are looking up, and possibly creating
105      * @return the associated 'ContainerAdjunct' instance, which may be new
106      */
107     private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
108
109         Object rval = policyContainer.getAdjunct(this);
110
111         if (rval == null || !(rval instanceof ContainerAdjunct)) {
112             // adjunct does not exist, or has the wrong type (should never
113             // happen)
114             rval = new ContainerAdjunct(policyContainer);
115             policyContainer.setAdjunct(this, rval);
116         }
117
118         return (ContainerAdjunct) rval;
119     }
120
121     /** 
122      * {@inheritDoc} */
123     @Override
124     public int getSequenceNumber() {
125         return 1;
126     }
127
128     /** 
129      * {@inheritDoc} */
130     @Override
131     public void globalInit(String[] args, String configDir) {
132
133         kieSvcFact = fact.getKieServices();
134
135         try {
136             persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
137
138         } catch (IOException e1) {
139             logger.error("initializePersistence: ", e1);
140         }
141
142         sessionInfoTimeoutMs = getPersistenceTimeout();
143     }
144
145     /**
146      * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
147      * does not exist yet.
148      */
149     @Override
150     public KieSession activatePolicySession(
151             PolicyContainer policyContainer, String name, String kieBaseName) {
152
153         if (isPersistenceEnabled(policyContainer, name)) {
154             cleanUpSessionInfo();
155
156             return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
157         }
158
159         return null;
160     }
161
162     /** 
163      * {@inheritDoc} */
164     @Override
165     public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
166
167         PolicyContainer policyContainer = session.getPolicyContainer();
168         if (isPersistenceEnabled(policyContainer, session.getName())) {
169             return new PersistentThreadModel(session, getProperties(policyContainer));
170         }
171         return null;
172     }
173
174     /** 
175      * {@inheritDoc} */
176     @Override
177     public void disposeKieSession(PolicySession policySession) {
178
179         ContainerAdjunct contAdj =
180                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
181         if (contAdj != null) {
182             contAdj.disposeKieSession(policySession.getName());
183         }
184     }
185
186     /** 
187      * {@inheritDoc} */
188     @Override
189     public void destroyKieSession(PolicySession policySession) {
190
191         ContainerAdjunct contAdj =
192                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
193         if (contAdj != null) {
194             contAdj.destroyKieSession(policySession.getName());
195         }
196     }
197
198     /** 
199      * {@inheritDoc} */
200     @Override
201     public boolean afterStart(PolicyEngine engine) {
202         return false;
203     }
204
205     /** 
206      * {@inheritDoc} */
207     @Override
208     public boolean beforeStart(PolicyEngine engine) {
209         synchronized (cleanupLock) {
210             sessInfoCleaned = false;
211         }
212
213         return false;
214     }
215
216     /** 
217      * {@inheritDoc} */
218     @Override
219     public boolean beforeActivate(PolicyEngine engine) {
220         synchronized (cleanupLock) {
221             sessInfoCleaned = false;
222         }
223
224         return false;
225     }
226
227     /** 
228      * {@inheritDoc} */
229     @Override
230     public boolean afterActivate(PolicyEngine engine) {
231         return false;
232     }
233
234     /* ============================================================ */
235
236     /**
237      * Gets the persistence timeout value for sessioninfo records.
238      *
239      * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
240      */
241     private long getPersistenceTimeout() {
242         String timeoutString = null;
243
244         try {
245             timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
246
247             if (timeoutString != null) {
248                 // timeout parameter is specified
249                 return Long.valueOf(timeoutString) * 1000;
250             }
251
252         } catch (NumberFormatException e) {
253             logger.error(
254                     "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
255                     timeoutString,
256                     e);
257         }
258
259         return -1;
260     }
261
262     /* ============================================================ */
263
264     /**
265      * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
266      * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
267      * garbage-collected with the container.
268      */
269     protected class ContainerAdjunct {
270         /** 'PolicyContainer' instance that this adjunct is extending. */
271         private PolicyContainer policyContainer;
272
273         /** Maps a KIE session name to its data source. */
274         private Map<String, DsEmf> name2ds = new HashMap<>();
275
276         /**
277          * Constructor - initialize a new 'ContainerAdjunct'.
278          *
279          * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
280          */
281         private ContainerAdjunct(PolicyContainer policyContainer) {
282             this.policyContainer = policyContainer;
283         }
284
285         /**
286          * Create a new persistent KieSession. If there is already a corresponding entry in the
287          * database, it is used to initialize the KieSession. If not, a completely new session is
288          * created.
289          *
290          * @param name the name of the KieSession (which is also the name of the associated
291          *     PolicySession)
292          * @param kieBaseName the name of the 'KieBase' instance containing this session
293          * @return a new KieSession with persistence enabled
294          */
295         private KieSession newPersistentKieSession(String name, String kieBaseName) {
296
297             configureSysProps();
298
299             BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
300             DsEmf dsemf = new DsEmf(ds);
301
302             try {
303                 EntityManagerFactory emf = dsemf.emf;
304                 DroolsSessionConnector conn = fact.makeJpaConnector(emf);
305
306                 long desiredSessionId = getSessionId(conn, name);
307
308                 logger.info(
309                         "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
310
311                 // session does not exist -- attempt to create one
312                 logger.info(
313                         "getPolicySession:session does not exist -- attempt to create one with name {}", name);
314
315                 Environment env = kieSvcFact.newEnvironment();
316
317                 configureKieEnv(env, emf);
318
319                 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
320
321                 KieSession kieSession =
322                         (desiredSessionId >= 0
323                         ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
324                                 : null);
325
326                 if (kieSession == null) {
327                     // loadKieSession() returned null or desiredSessionId < 0
328                     logger.info(
329                             "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
330
331                     kieSession = newKieSession(kieBaseName, env);
332                 }
333
334                 replaceSession(conn, name, kieSession);
335
336                 name2ds.put(name, dsemf);
337
338                 return kieSession;
339
340             } catch (RuntimeException e) {
341                 dsemf.close();
342                 throw e;
343             }
344         }
345
346         /**
347          * Loads an existing KieSession from the persistent store.
348          *
349          * @param kieBaseName the name of the 'KieBase' instance containing this session
350          * @param desiredSessionId id of the desired KieSession
351          * @param env Kie Environment for the session
352          * @param kConf Kie Configuration for the session
353          * @return the persistent session, or {@code null} if it could not be loaded
354          */
355         private KieSession loadKieSession(
356                 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
357             try {
358                 KieSession kieSession =
359                         kieSvcFact
360                         .getStoreServices()
361                         .loadKieSession(
362                                 desiredSessionId,
363                                 policyContainer.getKieContainer().getKieBase(kieBaseName),
364                                 kieConf,
365                                 env);
366
367                 logger.info("LOADING Loaded session {}", desiredSessionId);
368
369                 return kieSession;
370
371             } catch (Exception e) {
372                 logger.error("loadKieSession error: ", e);
373                 return null;
374             }
375         }
376
377         /**
378          * Creates a new, persistent KieSession.
379          *
380          * @param kieBaseName the name of the 'KieBase' instance containing this session
381          * @param env Kie Environment for the session
382          * @return a new, persistent session
383          */
384         private KieSession newKieSession(String kieBaseName, Environment env) {
385             KieSession kieSession =
386                     kieSvcFact
387                     .getStoreServices()
388                     .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
389
390             logger.info("LOADING CREATED {}", kieSession.getIdentifier());
391
392             return kieSession;
393         }
394
395         /**
396          * Closes the data source associated with a session.
397          *
398          * @param name name of the session being destroyed
399          */
400         private void destroyKieSession(String name) {
401             closeDataSource(name);
402         }
403
404         /**
405          * Closes the data source associated with a session.
406          *
407          * @param name name of the session being disposed of
408          */
409         private void disposeKieSession(String name) {
410             closeDataSource(name);
411         }
412
413         /**
414          * Closes the data source associated with a session.
415          *
416          * @param name name of the session whose data source is to be closed
417          */
418         private void closeDataSource(String name) {
419             DsEmf ds = name2ds.remove(name);
420             if (ds != null) {
421                 ds.close();
422             }
423         }
424
425         /** Configures java system properties for JPA/JTA. */
426         private void configureSysProps() {
427             System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
428             System.setProperty(
429                     "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
430                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
431             System.setProperty(
432                     "ObjectStoreEnvironmentBean.objectStoreDir",
433                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
434         }
435
436         /**
437          * Configures a Kie Environment.
438          *
439          * @param env environment to be configured
440          * @param emf entity manager factory
441          */
442         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
443             env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
444             env.set(EnvironmentName.TRANSACTION, fact.getUserTrans());
445             env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg());
446             env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
447         }
448
449         /**
450          * Gets a session's ID from the persistent store.
451          *
452          * @param conn persistence connector
453          * @param sessnm name of the session
454          * @return the session's id, or {@code -1} if the session is not found
455          */
456         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
457             DroolsSession sess = conn.get(sessnm);
458             return sess != null ? sess.getSessionId() : -1;
459         }
460
461         /**
462          * Replaces a session within the persistent store, if it exists. Adds it otherwise.
463          *
464          * @param conn persistence connector
465          * @param sessnm name of session to be updated
466          * @param kieSession new session information
467          */
468         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
469
470             DroolsSessionEntity sess = new DroolsSessionEntity();
471
472             sess.setSessionName(sessnm);
473             sess.setSessionId(kieSession.getIdentifier());
474
475             conn.replace(sess);
476         }
477     }
478
479     /* ============================================================ */
480
481     /**
482      * Gets the data source properties.
483      *
484      * @return the data source properties
485      */
486     private Properties getDataSourceProperties() {
487         Properties props = new Properties();
488         props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
489         props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
490         props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
491         props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
492         props.put("maxActive", "3");
493         props.put("maxIdle", "1");
494         props.put("maxWait", "120000");
495         props.put("whenExhaustedAction", "2");
496         props.put("testOnBorrow", "false");
497         props.put("poolPreparedStatements", "true");
498
499         return props;
500     }
501
502     /**
503      * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
504      * sessions. This also has the useful side-effect of removing abandoned records as well.
505      */
506     private void cleanUpSessionInfo() {
507
508         synchronized (cleanupLock) {
509             if (sessInfoCleaned) {
510                 logger.info("Clean up of sessioninfo table: already done");
511                 return;
512             }
513
514             if (sessionInfoTimeoutMs < 0) {
515                 logger.info("Clean up of sessioninfo table: no timeout specified");
516                 return;
517             }
518
519             // now do the record deletion
520             try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
521                     Connection connection = ds.getConnection();
522                     PreparedStatement statement =
523                             connection.prepareStatement(
524                                 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
525
526                 connection.setAutoCommit(true);
527
528                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
529
530                 int count = statement.executeUpdate();
531                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
532
533             } catch (SQLException e) {
534                 logger.error("Clean up of sessioninfo table failed", e);
535             }
536
537             // TODO: delete DroolsSessionEntity where sessionId not in
538             // (sessinfo.xxx)
539
540             sessInfoCleaned = true;
541         }
542     }
543
544     /**
545      * Determine whether persistence is enabled for a specific container.
546      *
547      * @param container container to be checked
548      * @param sessionName name of the session to be checked
549      * @return {@code true} if persistence is enabled for this container, and {@code false} if not
550      */
551     private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
552         Properties properties = getProperties(container);
553         boolean rval = false;
554
555         if (properties != null) {
556             // fetch the 'type' property
557             String type = getProperty(properties, sessionName, "type");
558             rval = "auto".equals(type) || "native".equals(type);
559         }
560
561         return rval;
562     }
563
564     /**
565      * Determine the controller properties associated with the policy container.
566      *
567      * @param container container whose properties are to be retrieved
568      * @return the container's properties, or {@code null} if not found
569      */
570     private Properties getProperties(PolicyContainer container) {
571         try {
572             return fact.getPolicyController(container).getProperties();
573         } catch (IllegalArgumentException e) {
574             logger.error("getProperties exception: ", e);
575             return null;
576         }
577     }
578
579     /**
580      * Fetch the persistence property associated with a session. The name may have the form:
581      *
582      * <ul>
583      *   <li>persistence.SESSION-NAME.PROPERTY
584      *   <li>persistence.PROPERTY
585      * </ul>
586      *
587      * @param properties properties from which the value is to be retrieved
588      * @param sessionName session name of interest
589      * @param property property name of interest
590      * @return the property value, or {@code null} if not found
591      */
592     private String getProperty(Properties properties, String sessionName, String property) {
593         String value = properties.getProperty("persistence." + sessionName + "." + property);
594         if (value == null) {
595             value = properties.getProperty("persistence." + property);
596         }
597
598         return value;
599     }
600
601     /* ============================================================ */
602
603     /**
604      * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
605      * 'fireUntilHalt' method isn't compatible with persistence.
606      */
607     public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
608
609         /** Session associated with this persistent thread. */
610         private final PolicySession session;
611
612         /** The session thread. */
613         private final Thread thread;
614
615         /** Used to indicate that processing should stop. */
616         private final CountDownLatch stopped = new CountDownLatch(1);
617
618         /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
619         long minSleepTime = 100;
620
621         /**
622          * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
623          * is a "half" time, so that we can multiply it by two without overflowing the word size.
624          */
625         long halfMaxSleepTime = 5000L / 2L;
626
627         /**
628          * Constructor - initialize variables and create thread.
629          *
630          * @param session the 'PolicySession' instance
631          * @param properties may contain additional session properties
632          */
633         public PersistentThreadModel(PolicySession session, Properties properties) {
634             this.session = session;
635             this.thread = new Thread(this, getThreadName());
636
637             if (properties == null) {
638                 return;
639             }
640
641             // extract 'minSleepTime' and/or 'maxSleepTime'
642             String name = session.getName();
643
644             // fetch 'minSleepTime' value, and update if defined
645             String sleepTimeString = getProperty(properties, name, "minSleepTime");
646             if (sleepTimeString != null) {
647                 try {
648                     minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
649                 } catch (Exception e) {
650                     logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
651                 }
652             }
653
654             // fetch 'maxSleepTime' value, and update if defined
655             long maxSleepTime = 2 * halfMaxSleepTime;
656             sleepTimeString = getProperty(properties, name, "maxSleepTime");
657             if (sleepTimeString != null) {
658                 try {
659                     maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
660                 } catch (Exception e) {
661                     logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
662                 }
663             }
664
665             // swap values if needed
666             if (minSleepTime > maxSleepTime) {
667                 logger.error(
668                         "minSleepTime("
669                                 + minSleepTime
670                                 + ") is greater than maxSleepTime("
671                                 + maxSleepTime
672                                 + ") -- swapping");
673                 long tmp = minSleepTime;
674                 minSleepTime = maxSleepTime;
675                 maxSleepTime = tmp;
676             }
677
678             halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
679         }
680
681         /**
682          * Get thread name.
683          *  
684          * @return the String to use as the thread name */
685         private String getThreadName() {
686             return "Session " + session.getFullName() + " (persistent)";
687         }
688
689         /** ************************ */
690         /* 'ThreadModel' interface */
691         /** ************************ */
692
693         /** 
694          * {@inheritDoc} */
695         @Override
696         public void start() {
697             thread.start();
698         }
699
700         /** 
701          * {@inheritDoc} */
702         @Override
703         public void stop() {
704             // tell the thread to stop
705             stopped.countDown();
706
707             // wait up to 10 seconds for the thread to stop
708             try {
709                 thread.join(10000);
710
711             } catch (InterruptedException e) {
712                 logger.error("stopThread exception: ", e);
713                 Thread.currentThread().interrupt();
714             }
715
716             // verify that it's done
717             if (thread.isAlive()) {
718                 logger.error("stopThread: still running");
719             }
720         }
721
722         /** 
723          * {@inheritDoc} */
724         @Override
725         public void updated() {
726             // the container artifact has been updated -- adjust the thread name
727             thread.setName(getThreadName());
728         }
729
730         /** ********************* */
731         /* 'Runnable' interface */
732         /** ********************* */
733
734         /** 
735          * {@inheritDoc} */
736         @Override
737         public void run() {
738             logger.info("PersistentThreadModel running");
739
740             // set thread local variable
741             session.setPolicySession();
742
743             KieSession kieSession = session.getKieSession();
744             long sleepTime = 2 * halfMaxSleepTime;
745
746             // We want to continue, despite any exceptions that occur
747             // while rules are fired.
748
749             boolean cont = true;
750             while (cont) {
751
752                 try {
753                     if (kieSession.fireAllRules() > 0) {
754                         // some rules fired -- reduce poll delay
755                         sleepTime = Math.max(minSleepTime, sleepTime / 2);
756                     } else {
757                         // no rules fired -- increase poll delay
758                         sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
759                     }
760
761                 } catch (Exception | LinkageError e) {
762                     logger.error("Exception during kieSession.fireAllRules", e);
763                 }
764
765                 try {
766                     if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
767                         cont = false;
768                     }
769
770                 } catch (InterruptedException e) {
771                     logger.error("startThread exception: ", e);
772                     Thread.currentThread().interrupt();
773                     cont = false;
774                 }
775             }
776
777             logger.info("PersistentThreadModel completed");
778         }
779     }
780
781     /* ============================================================ */
782
783     /** DataSource-EntityManagerFactory pair. */
784     private class DsEmf {
785         private BasicDataSource bds;
786         private EntityManagerFactory emf;
787
788         /**
789          * Makes an entity manager factory for the given data source.
790          *
791          * @param bds pooled data source
792          */
793         public DsEmf(BasicDataSource bds) {
794             try {
795                 Map<String, Object> props = new HashMap<>();
796                 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
797
798                 this.bds = bds;
799                 this.emf = fact.makeEntMgrFact(props);
800
801             } catch (RuntimeException e) {
802                 closeDataSource();
803                 throw e;
804             }
805         }
806
807         /** Closes the entity manager factory and the data source. */
808         public void close() {
809             try {
810                 emf.close();
811
812             } catch (RuntimeException e) {
813                 closeDataSource();
814                 throw e;
815             }
816
817             closeDataSource();
818         }
819
820         /** Closes the data source only. */
821         private void closeDataSource() {
822             try {
823                 bds.close();
824
825             } catch (SQLException e) {
826                 throw new PersistenceFeatureException(e);
827             }
828         }
829     }
830
831     private static class SingletonRegistry {
832         private static final TransactionSynchronizationRegistry transreg =
833                 new com.arjuna.ats.internal.jta.transaction.arjunacore
834                 .TransactionSynchronizationRegistryImple();
835
836         private SingletonRegistry() {
837             super();
838         }
839     }
840
841     /** Factory for various items. Methods can be overridden for junit testing. */
842     protected static class Factory {
843
844         /**
845          * Gets the transaction manager.
846          *
847          * @return the transaction manager
848          */
849         public TransactionManager getTransMgr() {
850             return com.arjuna.ats.jta.TransactionManager.transactionManager();
851         }
852
853         /**
854          * Gets the user transaction.
855          *
856          * @return the user transaction
857          */
858         public UserTransaction getUserTrans() {
859             return com.arjuna.ats.jta.UserTransaction.userTransaction();
860         }
861
862         /**
863          * Gets the transaction synchronization registry.
864          *
865          * @return the transaction synchronization registry
866          */
867         public TransactionSynchronizationRegistry getTransSyncReg() {
868             return SingletonRegistry.transreg;
869         }
870
871         /**
872          * Gets the KIE services.
873          *
874          * @return the KIE services
875          */
876         public KieServices getKieServices() {
877             return KieServices.Factory.get();
878         }
879
880         /**
881          * Loads properties from a file.
882          *
883          * @param filenm name of the file to load
884          * @return properties, as loaded from the file
885          * @throws IOException if an error occurs reading from the file
886          */
887         public Properties loadProperties(String filenm) throws IOException {
888             return PropertyUtil.getProperties(filenm);
889         }
890
891         /**
892          * Makes a Data Source.
893          *
894          * @param dsProps data source properties
895          * @return a new data source
896          */
897         public BasicDataSource makeDataSource(Properties dsProps) {
898             try {
899                 return BasicDataSourceFactory.createDataSource(dsProps);
900
901             } catch (Exception e) {
902                 throw new PersistenceFeatureException(e);
903             }
904         }
905
906         /**
907          * Makes a new JPA connector for drools sessions.
908          *
909          * @param emf entity manager factory
910          * @return a new JPA connector for drools sessions
911          */
912         public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
913             return new JpaDroolsSessionConnector(emf);
914         }
915
916         /**
917          * Makes a new entity manager factory.
918          *
919          * @param props properties with which the factory should be configured
920          * @return a new entity manager factory
921          */
922         public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
923             return Persistence.createEntityManagerFactory("onapsessionsPU", props);
924         }
925
926         /**
927          * Gets the policy controller associated with a given policy container.
928          *
929          * @param container container whose controller is to be retrieved
930          * @return the container's controller
931          */
932         public PolicyController getPolicyController(PolicyContainer container) {
933             return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
934         }
935     }
936
937     /**
938      * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
939      * particularly when they are not, themselves, Runtime exceptions.
940      */
941     public static class PersistenceFeatureException extends RuntimeException {
942         private static final long serialVersionUID = 1L;
943
944         /** 
945          * Constructor.
946          * */
947         public PersistenceFeatureException(Exception ex) {
948             super(ex);
949         }
950     }
951 }