cc826905c209170564e8a4227128eea027aa6f38
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureApi;
49 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyControllerConstants;
52 import org.onap.policy.drools.system.PolicyEngine;
53 import org.onap.policy.drools.utils.PropertyUtil;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * If this feature is supported, there is a single instance of it. It adds persistence to Drools
59  * sessions. In addition, if an active-standby feature exists, then that is used to determine the
60  * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
61  * id.
62  *
63  * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
64  * was moved here as part of making this a separate optional feature.
65  */
66 public class PersistenceFeature implements PolicySessionFeatureApi, PolicyEngineFeatureApi {
67
68     private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
69
70     /** KieService factory. */
71     private KieServices kieSvcFact;
72
73     /** Persistence properties. */
74     private Properties persistProps;
75
76     /** Whether or not the SessionInfo records should be cleaned out. */
77     private boolean sessInfoCleaned;
78
79     /** SessionInfo timeout, in milli-seconds, as read from
80      * {@link #persistProps}. */
81     private long sessionInfoTimeoutMs;
82
83     /** Object used to serialize cleanup of sessioninfo table. */
84     private Object cleanupLock = new Object();
85
86     /**
87      * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
88      * not found, create one.
89      *
90      * @param policyContainer the container whose adjunct we are looking up, and possibly creating
91      * @return the associated 'ContainerAdjunct' instance, which may be new
92      */
93     private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
94
95         Object rval = policyContainer.getAdjunct(this);
96
97         if (rval == null || !(rval instanceof ContainerAdjunct)) {
98             // adjunct does not exist, or has the wrong type (should never
99             // happen)
100             rval = new ContainerAdjunct(policyContainer);
101             policyContainer.setAdjunct(this, rval);
102         }
103
104         return (ContainerAdjunct) rval;
105     }
106
107     /**
108      * {@inheritDoc}.
109      **/
110     @Override
111     public int getSequenceNumber() {
112         return 1;
113     }
114
115     /**
116      * {@inheritDoc}.
117      **/
118     @Override
119     public void globalInit(String[] args, String configDir) {
120
121         kieSvcFact = getKieServices();
122
123         try {
124             persistProps = loadProperties(configDir + "/feature-session-persistence.properties");
125
126         } catch (IOException e1) {
127             logger.error("initializePersistence: ", e1);
128         }
129
130         sessionInfoTimeoutMs = getPersistenceTimeout();
131     }
132
133     /**
134      * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
135      * does not exist yet.
136      */
137     @Override
138     public KieSession activatePolicySession(
139             PolicyContainer policyContainer, String name, String kieBaseName) {
140
141         if (isPersistenceEnabled(policyContainer, name)) {
142             cleanUpSessionInfo();
143
144             return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
145         }
146
147         return null;
148     }
149
150     /**
151      * {@inheritDoc}.
152      **/
153     @Override
154     public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
155
156         PolicyContainer policyContainer = session.getPolicyContainer();
157         if (isPersistenceEnabled(policyContainer, session.getName())) {
158             return new PersistentThreadModel(session, getProperties(policyContainer));
159         }
160         return null;
161     }
162
163     /**
164      * {@inheritDoc}.
165      **/
166     @Override
167     public void disposeKieSession(PolicySession policySession) {
168
169         ContainerAdjunct contAdj =
170                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
171         if (contAdj != null) {
172             contAdj.disposeKieSession(policySession.getName());
173         }
174     }
175
176     /**
177      * {@inheritDoc}.
178      **/
179     @Override
180     public void destroyKieSession(PolicySession policySession) {
181
182         ContainerAdjunct contAdj =
183                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
184         if (contAdj != null) {
185             contAdj.destroyKieSession(policySession.getName());
186         }
187     }
188
189     /**
190      * {@inheritDoc}.
191      **/
192     @Override
193     public boolean afterStart(PolicyEngine engine) {
194         return false;
195     }
196
197     /**
198      * {@inheritDoc}.
199      **/
200     @Override
201     public boolean beforeStart(PolicyEngine engine) {
202         synchronized (cleanupLock) {
203             sessInfoCleaned = false;
204         }
205
206         return false;
207     }
208
209     /**
210      * {@inheritDoc}.
211      **/
212     @Override
213     public boolean beforeActivate(PolicyEngine engine) {
214         synchronized (cleanupLock) {
215             sessInfoCleaned = false;
216         }
217
218         return false;
219     }
220
221     /**
222      * {@inheritDoc}.
223      **/
224     @Override
225     public boolean afterActivate(PolicyEngine engine) {
226         return false;
227     }
228
229     /* ============================================================ */
230
231     /**
232      * Gets the persistence timeout value for sessioninfo records.
233      *
234      * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
235      */
236     private long getPersistenceTimeout() {
237         String timeoutString = null;
238
239         try {
240             timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
241
242             if (timeoutString != null) {
243                 // timeout parameter is specified
244                 return Long.valueOf(timeoutString) * 1000;
245             }
246
247         } catch (NumberFormatException e) {
248             logger.error(
249                     "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
250                     timeoutString,
251                     e);
252         }
253
254         return -1;
255     }
256
257     /* ============================================================ */
258
259     /**
260      * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
261      * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
262      * garbage-collected with the container.
263      */
264     protected class ContainerAdjunct {
265         /** 'PolicyContainer' instance that this adjunct is extending. */
266         private PolicyContainer policyContainer;
267
268         /** Maps a KIE session name to its data source. */
269         private Map<String, DsEmf> name2ds = new HashMap<>();
270
271         /**
272          * Constructor - initialize a new 'ContainerAdjunct'.
273          *
274          * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
275          */
276         private ContainerAdjunct(PolicyContainer policyContainer) {
277             this.policyContainer = policyContainer;
278         }
279
280         /**
281          * Create a new persistent KieSession. If there is already a corresponding entry in the
282          * database, it is used to initialize the KieSession. If not, a completely new session is
283          * created.
284          *
285          * @param name the name of the KieSession (which is also the name of the associated
286          *     PolicySession)
287          * @param kieBaseName the name of the 'KieBase' instance containing this session
288          * @return a new KieSession with persistence enabled
289          */
290         private KieSession newPersistentKieSession(String name, String kieBaseName) {
291
292             configureSysProps();
293
294             BasicDataSource ds = makeDataSource(getDataSourceProperties());
295             DsEmf dsemf = new DsEmf(ds);
296
297             try {
298                 EntityManagerFactory emf = dsemf.emf;
299                 DroolsSessionConnector conn = makeJpaConnector(emf);
300
301                 long desiredSessionId = getSessionId(conn, name);
302
303                 logger.info(
304                         "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
305
306                 // session does not exist -- attempt to create one
307                 logger.info(
308                         "getPolicySession:session does not exist -- attempt to create one with name {}", name);
309
310                 Environment env = kieSvcFact.newEnvironment();
311
312                 configureKieEnv(env, emf);
313
314                 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
315
316                 KieSession kieSession =
317                         (desiredSessionId >= 0
318                         ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
319                                 : null);
320
321                 if (kieSession == null) {
322                     // loadKieSession() returned null or desiredSessionId < 0
323                     logger.info(
324                             "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
325
326                     kieSession = newKieSession(kieBaseName, env);
327                 }
328
329                 replaceSession(conn, name, kieSession);
330
331                 name2ds.put(name, dsemf);
332
333                 return kieSession;
334
335             } catch (RuntimeException e) {
336                 dsemf.close();
337                 throw e;
338             }
339         }
340
341         /**
342          * Loads an existing KieSession from the persistent store.
343          *
344          * @param kieBaseName the name of the 'KieBase' instance containing this session
345          * @param desiredSessionId id of the desired KieSession
346          * @param env Kie Environment for the session
347          * @param kConf Kie Configuration for the session
348          * @return the persistent session, or {@code null} if it could not be loaded
349          */
350         private KieSession loadKieSession(
351                 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
352             try {
353                 KieSession kieSession =
354                         kieSvcFact
355                         .getStoreServices()
356                         .loadKieSession(
357                                 desiredSessionId,
358                                 policyContainer.getKieContainer().getKieBase(kieBaseName),
359                                 kieConf,
360                                 env);
361
362                 logger.info("LOADING Loaded session {}", desiredSessionId);
363
364                 return kieSession;
365
366             } catch (Exception e) {
367                 logger.error("loadKieSession error: ", e);
368                 return null;
369             }
370         }
371
372         /**
373          * Creates a new, persistent KieSession.
374          *
375          * @param kieBaseName the name of the 'KieBase' instance containing this session
376          * @param env Kie Environment for the session
377          * @return a new, persistent session
378          */
379         private KieSession newKieSession(String kieBaseName, Environment env) {
380             KieSession kieSession =
381                     kieSvcFact
382                     .getStoreServices()
383                     .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
384
385             logger.info("LOADING CREATED {}", kieSession.getIdentifier());
386
387             return kieSession;
388         }
389
390         /**
391          * Closes the data source associated with a session.
392          *
393          * @param name name of the session being destroyed
394          */
395         private void destroyKieSession(String name) {
396             closeDataSource(name);
397         }
398
399         /**
400          * Closes the data source associated with a session.
401          *
402          * @param name name of the session being disposed of
403          */
404         private void disposeKieSession(String name) {
405             closeDataSource(name);
406         }
407
408         /**
409          * Closes the data source associated with a session.
410          *
411          * @param name name of the session whose data source is to be closed
412          */
413         private void closeDataSource(String name) {
414             DsEmf ds = name2ds.remove(name);
415             if (ds != null) {
416                 ds.close();
417             }
418         }
419
420         /** Configures java system properties for JPA/JTA. */
421         private void configureSysProps() {
422             System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
423             System.setProperty(
424                     "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
425                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
426             System.setProperty(
427                     "ObjectStoreEnvironmentBean.objectStoreDir",
428                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
429         }
430
431         /**
432          * Configures a Kie Environment.
433          *
434          * @param env environment to be configured
435          * @param emf entity manager factory
436          */
437         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
438             env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
439             env.set(EnvironmentName.TRANSACTION, getUserTrans());
440             env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, getTransSyncReg());
441             env.set(EnvironmentName.TRANSACTION_MANAGER, getTransMgr());
442         }
443
444         /**
445          * Gets a session's ID from the persistent store.
446          *
447          * @param conn persistence connector
448          * @param sessnm name of the session
449          * @return the session's id, or {@code -1} if the session is not found
450          */
451         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
452             DroolsSession sess = conn.get(sessnm);
453             return sess != null ? sess.getSessionId() : -1;
454         }
455
456         /**
457          * Replaces a session within the persistent store, if it exists. Adds it otherwise.
458          *
459          * @param conn persistence connector
460          * @param sessnm name of session to be updated
461          * @param kieSession new session information
462          */
463         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
464
465             DroolsSessionEntity sess = new DroolsSessionEntity();
466
467             sess.setSessionName(sessnm);
468             sess.setSessionId(kieSession.getIdentifier());
469
470             conn.replace(sess);
471         }
472     }
473
474     /* ============================================================ */
475
476     /**
477      * Gets the data source properties.
478      *
479      * @return the data source properties
480      */
481     private Properties getDataSourceProperties() {
482         Properties props = new Properties();
483         props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
484         props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
485         props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
486         props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
487         props.put("maxActive", "3");
488         props.put("maxIdle", "1");
489         props.put("maxWait", "120000");
490         props.put("whenExhaustedAction", "2");
491         props.put("testOnBorrow", "false");
492         props.put("poolPreparedStatements", "true");
493
494         return props;
495     }
496
497     /**
498      * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
499      * sessions. This also has the useful side-effect of removing abandoned records as well.
500      */
501     private void cleanUpSessionInfo() {
502
503         synchronized (cleanupLock) {
504             if (sessInfoCleaned) {
505                 logger.info("Clean up of sessioninfo table: already done");
506                 return;
507             }
508
509             if (sessionInfoTimeoutMs < 0) {
510                 logger.info("Clean up of sessioninfo table: no timeout specified");
511                 return;
512             }
513
514             // now do the record deletion
515             try (BasicDataSource ds = makeDataSource(getDataSourceProperties());
516                     Connection connection = ds.getConnection();
517                     PreparedStatement statement =
518                             connection.prepareStatement(
519                                 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
520
521                 connection.setAutoCommit(true);
522
523                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
524
525                 int count = statement.executeUpdate();
526                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
527
528             } catch (SQLException e) {
529                 logger.error("Clean up of sessioninfo table failed", e);
530             }
531
532             // delete DroolsSessionEntity where sessionId not in (sessinfo.xxx)?
533
534             sessInfoCleaned = true;
535         }
536     }
537
538     /**
539      * Determine whether persistence is enabled for a specific container.
540      *
541      * @param container container to be checked
542      * @param sessionName name of the session to be checked
543      * @return {@code true} if persistence is enabled for this container, and {@code false} if not
544      */
545     private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
546         Properties properties = getProperties(container);
547         boolean rval = false;
548
549         if (properties != null) {
550             // fetch the 'type' property
551             String type = getProperty(properties, sessionName, "type");
552             rval = "auto".equals(type) || "native".equals(type);
553         }
554
555         return rval;
556     }
557
558     /**
559      * Determine the controller properties associated with the policy container.
560      *
561      * @param container container whose properties are to be retrieved
562      * @return the container's properties, or {@code null} if not found
563      */
564     private Properties getProperties(PolicyContainer container) {
565         try {
566             return getPolicyController(container).getProperties();
567         } catch (IllegalArgumentException e) {
568             logger.error("getProperties exception: ", e);
569             return null;
570         }
571     }
572
573     /**
574      * Fetch the persistence property associated with a session. The name may have the form:
575      *
576      * <ul>
577      *   <li>persistence.SESSION-NAME.PROPERTY
578      *   <li>persistence.PROPERTY
579      * </ul>
580      *
581      * @param properties properties from which the value is to be retrieved
582      * @param sessionName session name of interest
583      * @param property property name of interest
584      * @return the property value, or {@code null} if not found
585      */
586     private String getProperty(Properties properties, String sessionName, String property) {
587         String value = properties.getProperty("persistence." + sessionName + "." + property);
588         if (value == null) {
589             value = properties.getProperty("persistence." + property);
590         }
591
592         return value;
593     }
594
595     /* ============================================================ */
596
597     /**
598      * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
599      * 'fireUntilHalt' method isn't compatible with persistence.
600      */
601     public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
602
603         /** Session associated with this persistent thread. */
604         private final PolicySession session;
605
606         /** The session thread. */
607         private final Thread thread;
608
609         /** Used to indicate that processing should stop. */
610         private final CountDownLatch stopped = new CountDownLatch(1);
611
612         /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
613         long minSleepTime = 100;
614
615         /**
616          * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
617          * is a "half" time, so that we can multiply it by two without overflowing the word size.
618          */
619         long halfMaxSleepTime = 5000L / 2L;
620
621         /**
622          * Constructor - initialize variables and create thread.
623          *
624          * @param session the 'PolicySession' instance
625          * @param properties may contain additional session properties
626          */
627         public PersistentThreadModel(PolicySession session, Properties properties) {
628             this.session = session;
629             this.thread = new Thread(this, getThreadName());
630
631             if (properties == null) {
632                 return;
633             }
634
635             // extract 'minSleepTime' and/or 'maxSleepTime'
636             String name = session.getName();
637
638             // fetch 'minSleepTime' value, and update if defined
639             String sleepTimeString = getProperty(properties, name, "minSleepTime");
640             if (sleepTimeString != null) {
641                 try {
642                     minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
643                 } catch (Exception e) {
644                     logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
645                 }
646             }
647
648             // fetch 'maxSleepTime' value, and update if defined
649             long maxSleepTime = 2 * halfMaxSleepTime;
650             sleepTimeString = getProperty(properties, name, "maxSleepTime");
651             if (sleepTimeString != null) {
652                 try {
653                     maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
654                 } catch (Exception e) {
655                     logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
656                 }
657             }
658
659             // swap values if needed
660             if (minSleepTime > maxSleepTime) {
661                 logger.error(
662                         "minSleepTime("
663                                 + minSleepTime
664                                 + ") is greater than maxSleepTime("
665                                 + maxSleepTime
666                                 + ") -- swapping");
667                 long tmp = minSleepTime;
668                 minSleepTime = maxSleepTime;
669                 maxSleepTime = tmp;
670             }
671
672             halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
673         }
674
675         /**
676          * Get thread name.
677          *
678          * @return the String to use as the thread name */
679         private String getThreadName() {
680             return "Session " + session.getFullName() + " (persistent)";
681         }
682
683         /*=========================*/
684         /* 'ThreadModel' interface */
685         /*=========================*/
686
687         /**
688          * {@inheritDoc}.
689          **/
690         @Override
691         public void start() {
692             thread.start();
693         }
694
695         /**
696          * {@inheritDoc}.
697          **/
698         @Override
699         public void stop() {
700             // tell the thread to stop
701             stopped.countDown();
702
703             // wait up to 10 seconds for the thread to stop
704             try {
705                 thread.join(10000);
706
707             } catch (InterruptedException e) {
708                 logger.error("stopThread exception: ", e);
709                 Thread.currentThread().interrupt();
710             }
711
712             // verify that it's done
713             if (thread.isAlive()) {
714                 logger.error("stopThread: still running");
715             }
716         }
717
718         /**
719          * {@inheritDoc}.
720          **/
721         @Override
722         public void updated() {
723             // the container artifact has been updated -- adjust the thread name
724             thread.setName(getThreadName());
725         }
726
727         /*======================*/
728         /* 'Runnable' interface */
729         /*======================*/
730
731         /**
732          * {@inheritDoc}.
733          **/
734         @Override
735         public void run() {
736             logger.info("PersistentThreadModel running");
737
738             // set thread local variable
739             session.setPolicySession();
740
741             KieSession kieSession = session.getKieSession();
742             long sleepTime = 2 * halfMaxSleepTime;
743
744             // We want to continue, despite any exceptions that occur
745             // while rules are fired.
746
747             boolean cont = true;
748             while (cont) {
749
750                 try {
751                     if (kieSession.fireAllRules() > 0) {
752                         // some rules fired -- reduce poll delay
753                         sleepTime = Math.max(minSleepTime, sleepTime / 2);
754                     } else {
755                         // no rules fired -- increase poll delay
756                         sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
757                     }
758
759                 } catch (Exception | LinkageError e) {
760                     logger.error("Exception during kieSession.fireAllRules", e);
761                 }
762
763                 try {
764                     if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
765                         cont = false;
766                     }
767
768                 } catch (InterruptedException e) {
769                     logger.error("startThread exception: ", e);
770                     Thread.currentThread().interrupt();
771                     cont = false;
772                 }
773             }
774
775             logger.info("PersistentThreadModel completed");
776         }
777     }
778
779     /* ============================================================ */
780
781     /** DataSource-EntityManagerFactory pair. */
782     private class DsEmf {
783         private BasicDataSource bds;
784         private EntityManagerFactory emf;
785
786         /**
787          * Makes an entity manager factory for the given data source.
788          *
789          * @param bds pooled data source
790          */
791         public DsEmf(BasicDataSource bds) {
792             try {
793                 Map<String, Object> props = new HashMap<>();
794                 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
795
796                 this.bds = bds;
797                 this.emf = makeEntMgrFact(props);
798
799             } catch (RuntimeException e) {
800                 closeDataSource();
801                 throw e;
802             }
803         }
804
805         /** Closes the entity manager factory and the data source. */
806         public void close() {
807             try {
808                 emf.close();
809
810             } catch (RuntimeException e) {
811                 closeDataSource();
812                 throw e;
813             }
814
815             closeDataSource();
816         }
817
818         /** Closes the data source only. */
819         private void closeDataSource() {
820             try {
821                 bds.close();
822
823             } catch (SQLException e) {
824                 throw new PersistenceFeatureException(e);
825             }
826         }
827     }
828
829     private static class SingletonRegistry {
830         private static final TransactionSynchronizationRegistry transreg =
831                 new com.arjuna.ats.internal.jta.transaction.arjunacore
832                 .TransactionSynchronizationRegistryImple();
833
834         private SingletonRegistry() {
835             super();
836         }
837     }
838
839     /** Factory for various items. Methods can be overridden for junit testing. */
840
841     /**
842      * Gets the transaction manager.
843      *
844      * @return the transaction manager
845      */
846     protected TransactionManager getTransMgr() {
847         return com.arjuna.ats.jta.TransactionManager.transactionManager();
848     }
849
850     /**
851      * Gets the user transaction.
852      *
853      * @return the user transaction
854      */
855     protected UserTransaction getUserTrans() {
856         return com.arjuna.ats.jta.UserTransaction.userTransaction();
857     }
858
859     /**
860      * Gets the transaction synchronization registry.
861      *
862      * @return the transaction synchronization registry
863      */
864     protected TransactionSynchronizationRegistry getTransSyncReg() {
865         return SingletonRegistry.transreg;
866     }
867
868     /**
869      * Gets the KIE services.
870      *
871      * @return the KIE services
872      */
873     protected KieServices getKieServices() {
874         return KieServices.Factory.get();
875     }
876
877     /**
878      * Loads properties from a file.
879      *
880      * @param filenm name of the file to load
881      * @return properties, as loaded from the file
882      * @throws IOException if an error occurs reading from the file
883      */
884     protected Properties loadProperties(String filenm) throws IOException {
885         return PropertyUtil.getProperties(filenm);
886     }
887
888     /**
889      * Makes a Data Source.
890      *
891      * @param dsProps data source properties
892      * @return a new data source
893      */
894     protected BasicDataSource makeDataSource(Properties dsProps) {
895         try {
896             return BasicDataSourceFactory.createDataSource(dsProps);
897
898         } catch (Exception e) {
899             throw new PersistenceFeatureException(e);
900         }
901     }
902
903     /**
904      * Makes a new JPA connector for drools sessions.
905      *
906      * @param emf entity manager factory
907      * @return a new JPA connector for drools sessions
908      */
909     protected DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
910         return new JpaDroolsSessionConnector(emf);
911     }
912
913     /**
914      * Makes a new entity manager factory.
915      *
916      * @param props properties with which the factory should be configured
917      * @return a new entity manager factory
918      */
919     protected EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
920         return Persistence.createEntityManagerFactory("onapsessionsPU", props);
921     }
922
923     /**
924      * Gets the policy controller associated with a given policy container.
925      *
926      * @param container container whose controller is to be retrieved
927      * @return the container's controller
928      */
929     protected PolicyController getPolicyController(PolicyContainer container) {
930         return PolicyControllerConstants.getFactory().get(container.getGroupId(), container.getArtifactId());
931     }
932
933     /**
934      * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
935      * particularly when they are not, themselves, Runtime exceptions.
936      */
937     public static class PersistenceFeatureException extends RuntimeException {
938         private static final long serialVersionUID = 1L;
939
940         /**
941          * Constructor.
942          * */
943         public PersistenceFeatureException(Exception ex) {
944             super(ex);
945         }
946     }
947 }