ce038d25267b87353a592aa10b15685ce3321d97
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * If this feature is supported, there is a single instance of it. It adds persistence to Drools
58  * sessions. In addition, if an active-standby feature exists, then that is used to determine the
59  * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
60  * id.
61  *
62  * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
63  * was moved here as part of making this a separate optional feature.
64  */
65 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
66
67     private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
68     
69     /** KieService factory. */
70     private KieServices kieSvcFact;
71
72     /** Persistence properties. */
73     private Properties persistProps;
74
75     /** Whether or not the SessionInfo records should be cleaned out. */
76     private boolean sessInfoCleaned;
77
78     /** SessionInfo timeout, in milli-seconds, as read from 
79      * {@link #persistProps}. */
80     private long sessionInfoTimeoutMs;
81
82     /** Object used to serialize cleanup of sessioninfo table. */
83     private Object cleanupLock = new Object();
84
85     /**
86      * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
87      * not found, create one.
88      *
89      * @param policyContainer the container whose adjunct we are looking up, and possibly creating
90      * @return the associated 'ContainerAdjunct' instance, which may be new
91      */
92     private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
93
94         Object rval = policyContainer.getAdjunct(this);
95
96         if (rval == null || !(rval instanceof ContainerAdjunct)) {
97             // adjunct does not exist, or has the wrong type (should never
98             // happen)
99             rval = new ContainerAdjunct(policyContainer);
100             policyContainer.setAdjunct(this, rval);
101         }
102
103         return (ContainerAdjunct) rval;
104     }
105
106     /** 
107      * {@inheritDoc}.
108      **/
109     @Override
110     public int getSequenceNumber() {
111         return 1;
112     }
113
114     /** 
115      * {@inheritDoc}.
116      **/
117     @Override
118     public void globalInit(String[] args, String configDir) {
119
120         kieSvcFact = getKieServices();
121
122         try {
123             persistProps = loadProperties(configDir + "/feature-session-persistence.properties");
124
125         } catch (IOException e1) {
126             logger.error("initializePersistence: ", e1);
127         }
128
129         sessionInfoTimeoutMs = getPersistenceTimeout();
130     }
131
132     /**
133      * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
134      * does not exist yet.
135      */
136     @Override
137     public KieSession activatePolicySession(
138             PolicyContainer policyContainer, String name, String kieBaseName) {
139
140         if (isPersistenceEnabled(policyContainer, name)) {
141             cleanUpSessionInfo();
142
143             return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
144         }
145
146         return null;
147     }
148
149     /** 
150      * {@inheritDoc}.
151      **/
152     @Override
153     public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
154
155         PolicyContainer policyContainer = session.getPolicyContainer();
156         if (isPersistenceEnabled(policyContainer, session.getName())) {
157             return new PersistentThreadModel(session, getProperties(policyContainer));
158         }
159         return null;
160     }
161
162     /** 
163      * {@inheritDoc}.
164      **/
165     @Override
166     public void disposeKieSession(PolicySession policySession) {
167
168         ContainerAdjunct contAdj =
169                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
170         if (contAdj != null) {
171             contAdj.disposeKieSession(policySession.getName());
172         }
173     }
174
175     /** 
176      * {@inheritDoc}.
177      **/
178     @Override
179     public void destroyKieSession(PolicySession policySession) {
180
181         ContainerAdjunct contAdj =
182                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
183         if (contAdj != null) {
184             contAdj.destroyKieSession(policySession.getName());
185         }
186     }
187
188     /** 
189      * {@inheritDoc}.
190      **/
191     @Override
192     public boolean afterStart(PolicyEngine engine) {
193         return false;
194     }
195
196     /** 
197      * {@inheritDoc}.
198      **/
199     @Override
200     public boolean beforeStart(PolicyEngine engine) {
201         synchronized (cleanupLock) {
202             sessInfoCleaned = false;
203         }
204
205         return false;
206     }
207
208     /** 
209      * {@inheritDoc}.
210      **/
211     @Override
212     public boolean beforeActivate(PolicyEngine engine) {
213         synchronized (cleanupLock) {
214             sessInfoCleaned = false;
215         }
216
217         return false;
218     }
219
220     /** 
221      * {@inheritDoc}.
222      **/
223     @Override
224     public boolean afterActivate(PolicyEngine engine) {
225         return false;
226     }
227
228     /* ============================================================ */
229
230     /**
231      * Gets the persistence timeout value for sessioninfo records.
232      *
233      * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
234      */
235     private long getPersistenceTimeout() {
236         String timeoutString = null;
237
238         try {
239             timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
240
241             if (timeoutString != null) {
242                 // timeout parameter is specified
243                 return Long.valueOf(timeoutString) * 1000;
244             }
245
246         } catch (NumberFormatException e) {
247             logger.error(
248                     "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
249                     timeoutString,
250                     e);
251         }
252
253         return -1;
254     }
255
256     /* ============================================================ */
257
258     /**
259      * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
260      * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
261      * garbage-collected with the container.
262      */
263     protected class ContainerAdjunct {
264         /** 'PolicyContainer' instance that this adjunct is extending. */
265         private PolicyContainer policyContainer;
266
267         /** Maps a KIE session name to its data source. */
268         private Map<String, DsEmf> name2ds = new HashMap<>();
269
270         /**
271          * Constructor - initialize a new 'ContainerAdjunct'.
272          *
273          * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
274          */
275         private ContainerAdjunct(PolicyContainer policyContainer) {
276             this.policyContainer = policyContainer;
277         }
278
279         /**
280          * Create a new persistent KieSession. If there is already a corresponding entry in the
281          * database, it is used to initialize the KieSession. If not, a completely new session is
282          * created.
283          *
284          * @param name the name of the KieSession (which is also the name of the associated
285          *     PolicySession)
286          * @param kieBaseName the name of the 'KieBase' instance containing this session
287          * @return a new KieSession with persistence enabled
288          */
289         private KieSession newPersistentKieSession(String name, String kieBaseName) {
290
291             configureSysProps();
292
293             BasicDataSource ds = makeDataSource(getDataSourceProperties());
294             DsEmf dsemf = new DsEmf(ds);
295
296             try {
297                 EntityManagerFactory emf = dsemf.emf;
298                 DroolsSessionConnector conn = makeJpaConnector(emf);
299
300                 long desiredSessionId = getSessionId(conn, name);
301
302                 logger.info(
303                         "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
304
305                 // session does not exist -- attempt to create one
306                 logger.info(
307                         "getPolicySession:session does not exist -- attempt to create one with name {}", name);
308
309                 Environment env = kieSvcFact.newEnvironment();
310
311                 configureKieEnv(env, emf);
312
313                 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
314
315                 KieSession kieSession =
316                         (desiredSessionId >= 0
317                         ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
318                                 : null);
319
320                 if (kieSession == null) {
321                     // loadKieSession() returned null or desiredSessionId < 0
322                     logger.info(
323                             "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
324
325                     kieSession = newKieSession(kieBaseName, env);
326                 }
327
328                 replaceSession(conn, name, kieSession);
329
330                 name2ds.put(name, dsemf);
331
332                 return kieSession;
333
334             } catch (RuntimeException e) {
335                 dsemf.close();
336                 throw e;
337             }
338         }
339
340         /**
341          * Loads an existing KieSession from the persistent store.
342          *
343          * @param kieBaseName the name of the 'KieBase' instance containing this session
344          * @param desiredSessionId id of the desired KieSession
345          * @param env Kie Environment for the session
346          * @param kConf Kie Configuration for the session
347          * @return the persistent session, or {@code null} if it could not be loaded
348          */
349         private KieSession loadKieSession(
350                 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
351             try {
352                 KieSession kieSession =
353                         kieSvcFact
354                         .getStoreServices()
355                         .loadKieSession(
356                                 desiredSessionId,
357                                 policyContainer.getKieContainer().getKieBase(kieBaseName),
358                                 kieConf,
359                                 env);
360
361                 logger.info("LOADING Loaded session {}", desiredSessionId);
362
363                 return kieSession;
364
365             } catch (Exception e) {
366                 logger.error("loadKieSession error: ", e);
367                 return null;
368             }
369         }
370
371         /**
372          * Creates a new, persistent KieSession.
373          *
374          * @param kieBaseName the name of the 'KieBase' instance containing this session
375          * @param env Kie Environment for the session
376          * @return a new, persistent session
377          */
378         private KieSession newKieSession(String kieBaseName, Environment env) {
379             KieSession kieSession =
380                     kieSvcFact
381                     .getStoreServices()
382                     .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
383
384             logger.info("LOADING CREATED {}", kieSession.getIdentifier());
385
386             return kieSession;
387         }
388
389         /**
390          * Closes the data source associated with a session.
391          *
392          * @param name name of the session being destroyed
393          */
394         private void destroyKieSession(String name) {
395             closeDataSource(name);
396         }
397
398         /**
399          * Closes the data source associated with a session.
400          *
401          * @param name name of the session being disposed of
402          */
403         private void disposeKieSession(String name) {
404             closeDataSource(name);
405         }
406
407         /**
408          * Closes the data source associated with a session.
409          *
410          * @param name name of the session whose data source is to be closed
411          */
412         private void closeDataSource(String name) {
413             DsEmf ds = name2ds.remove(name);
414             if (ds != null) {
415                 ds.close();
416             }
417         }
418
419         /** Configures java system properties for JPA/JTA. */
420         private void configureSysProps() {
421             System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
422             System.setProperty(
423                     "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
424                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
425             System.setProperty(
426                     "ObjectStoreEnvironmentBean.objectStoreDir",
427                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
428         }
429
430         /**
431          * Configures a Kie Environment.
432          *
433          * @param env environment to be configured
434          * @param emf entity manager factory
435          */
436         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
437             env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
438             env.set(EnvironmentName.TRANSACTION, getUserTrans());
439             env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, getTransSyncReg());
440             env.set(EnvironmentName.TRANSACTION_MANAGER, getTransMgr());
441         }
442
443         /**
444          * Gets a session's ID from the persistent store.
445          *
446          * @param conn persistence connector
447          * @param sessnm name of the session
448          * @return the session's id, or {@code -1} if the session is not found
449          */
450         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
451             DroolsSession sess = conn.get(sessnm);
452             return sess != null ? sess.getSessionId() : -1;
453         }
454
455         /**
456          * Replaces a session within the persistent store, if it exists. Adds it otherwise.
457          *
458          * @param conn persistence connector
459          * @param sessnm name of session to be updated
460          * @param kieSession new session information
461          */
462         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
463
464             DroolsSessionEntity sess = new DroolsSessionEntity();
465
466             sess.setSessionName(sessnm);
467             sess.setSessionId(kieSession.getIdentifier());
468
469             conn.replace(sess);
470         }
471     }
472
473     /* ============================================================ */
474
475     /**
476      * Gets the data source properties.
477      *
478      * @return the data source properties
479      */
480     private Properties getDataSourceProperties() {
481         Properties props = new Properties();
482         props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
483         props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
484         props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
485         props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
486         props.put("maxActive", "3");
487         props.put("maxIdle", "1");
488         props.put("maxWait", "120000");
489         props.put("whenExhaustedAction", "2");
490         props.put("testOnBorrow", "false");
491         props.put("poolPreparedStatements", "true");
492
493         return props;
494     }
495
496     /**
497      * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
498      * sessions. This also has the useful side-effect of removing abandoned records as well.
499      */
500     private void cleanUpSessionInfo() {
501
502         synchronized (cleanupLock) {
503             if (sessInfoCleaned) {
504                 logger.info("Clean up of sessioninfo table: already done");
505                 return;
506             }
507
508             if (sessionInfoTimeoutMs < 0) {
509                 logger.info("Clean up of sessioninfo table: no timeout specified");
510                 return;
511             }
512
513             // now do the record deletion
514             try (BasicDataSource ds = makeDataSource(getDataSourceProperties());
515                     Connection connection = ds.getConnection();
516                     PreparedStatement statement =
517                             connection.prepareStatement(
518                                 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
519
520                 connection.setAutoCommit(true);
521
522                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
523
524                 int count = statement.executeUpdate();
525                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
526
527             } catch (SQLException e) {
528                 logger.error("Clean up of sessioninfo table failed", e);
529             }
530
531             // delete DroolsSessionEntity where sessionId not in (sessinfo.xxx)?
532
533             sessInfoCleaned = true;
534         }
535     }
536
537     /**
538      * Determine whether persistence is enabled for a specific container.
539      *
540      * @param container container to be checked
541      * @param sessionName name of the session to be checked
542      * @return {@code true} if persistence is enabled for this container, and {@code false} if not
543      */
544     private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
545         Properties properties = getProperties(container);
546         boolean rval = false;
547
548         if (properties != null) {
549             // fetch the 'type' property
550             String type = getProperty(properties, sessionName, "type");
551             rval = "auto".equals(type) || "native".equals(type);
552         }
553
554         return rval;
555     }
556
557     /**
558      * Determine the controller properties associated with the policy container.
559      *
560      * @param container container whose properties are to be retrieved
561      * @return the container's properties, or {@code null} if not found
562      */
563     private Properties getProperties(PolicyContainer container) {
564         try {
565             return getPolicyController(container).getProperties();
566         } catch (IllegalArgumentException e) {
567             logger.error("getProperties exception: ", e);
568             return null;
569         }
570     }
571
572     /**
573      * Fetch the persistence property associated with a session. The name may have the form:
574      *
575      * <ul>
576      *   <li>persistence.SESSION-NAME.PROPERTY
577      *   <li>persistence.PROPERTY
578      * </ul>
579      *
580      * @param properties properties from which the value is to be retrieved
581      * @param sessionName session name of interest
582      * @param property property name of interest
583      * @return the property value, or {@code null} if not found
584      */
585     private String getProperty(Properties properties, String sessionName, String property) {
586         String value = properties.getProperty("persistence." + sessionName + "." + property);
587         if (value == null) {
588             value = properties.getProperty("persistence." + property);
589         }
590
591         return value;
592     }
593
594     /* ============================================================ */
595
596     /**
597      * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
598      * 'fireUntilHalt' method isn't compatible with persistence.
599      */
600     public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
601
602         /** Session associated with this persistent thread. */
603         private final PolicySession session;
604
605         /** The session thread. */
606         private final Thread thread;
607
608         /** Used to indicate that processing should stop. */
609         private final CountDownLatch stopped = new CountDownLatch(1);
610
611         /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
612         long minSleepTime = 100;
613
614         /**
615          * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
616          * is a "half" time, so that we can multiply it by two without overflowing the word size.
617          */
618         long halfMaxSleepTime = 5000L / 2L;
619
620         /**
621          * Constructor - initialize variables and create thread.
622          *
623          * @param session the 'PolicySession' instance
624          * @param properties may contain additional session properties
625          */
626         public PersistentThreadModel(PolicySession session, Properties properties) {
627             this.session = session;
628             this.thread = new Thread(this, getThreadName());
629
630             if (properties == null) {
631                 return;
632             }
633
634             // extract 'minSleepTime' and/or 'maxSleepTime'
635             String name = session.getName();
636
637             // fetch 'minSleepTime' value, and update if defined
638             String sleepTimeString = getProperty(properties, name, "minSleepTime");
639             if (sleepTimeString != null) {
640                 try {
641                     minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
642                 } catch (Exception e) {
643                     logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
644                 }
645             }
646
647             // fetch 'maxSleepTime' value, and update if defined
648             long maxSleepTime = 2 * halfMaxSleepTime;
649             sleepTimeString = getProperty(properties, name, "maxSleepTime");
650             if (sleepTimeString != null) {
651                 try {
652                     maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
653                 } catch (Exception e) {
654                     logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
655                 }
656             }
657
658             // swap values if needed
659             if (minSleepTime > maxSleepTime) {
660                 logger.error(
661                         "minSleepTime("
662                                 + minSleepTime
663                                 + ") is greater than maxSleepTime("
664                                 + maxSleepTime
665                                 + ") -- swapping");
666                 long tmp = minSleepTime;
667                 minSleepTime = maxSleepTime;
668                 maxSleepTime = tmp;
669             }
670
671             halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
672         }
673
674         /**
675          * Get thread name.
676          *  
677          * @return the String to use as the thread name */
678         private String getThreadName() {
679             return "Session " + session.getFullName() + " (persistent)";
680         }
681
682         /*=========================*/
683         /* 'ThreadModel' interface */
684         /*=========================*/
685
686         /** 
687          * {@inheritDoc}.
688          **/
689         @Override
690         public void start() {
691             thread.start();
692         }
693
694         /** 
695          * {@inheritDoc}.
696          **/
697         @Override
698         public void stop() {
699             // tell the thread to stop
700             stopped.countDown();
701
702             // wait up to 10 seconds for the thread to stop
703             try {
704                 thread.join(10000);
705
706             } catch (InterruptedException e) {
707                 logger.error("stopThread exception: ", e);
708                 Thread.currentThread().interrupt();
709             }
710
711             // verify that it's done
712             if (thread.isAlive()) {
713                 logger.error("stopThread: still running");
714             }
715         }
716
717         /** 
718          * {@inheritDoc}.
719          **/
720         @Override
721         public void updated() {
722             // the container artifact has been updated -- adjust the thread name
723             thread.setName(getThreadName());
724         }
725
726         /*======================*/
727         /* 'Runnable' interface */
728         /*======================*/
729
730         /** 
731          * {@inheritDoc}.
732          **/
733         @Override
734         public void run() {
735             logger.info("PersistentThreadModel running");
736
737             // set thread local variable
738             session.setPolicySession();
739
740             KieSession kieSession = session.getKieSession();
741             long sleepTime = 2 * halfMaxSleepTime;
742
743             // We want to continue, despite any exceptions that occur
744             // while rules are fired.
745
746             boolean cont = true;
747             while (cont) {
748
749                 try {
750                     if (kieSession.fireAllRules() > 0) {
751                         // some rules fired -- reduce poll delay
752                         sleepTime = Math.max(minSleepTime, sleepTime / 2);
753                     } else {
754                         // no rules fired -- increase poll delay
755                         sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
756                     }
757
758                 } catch (Exception | LinkageError e) {
759                     logger.error("Exception during kieSession.fireAllRules", e);
760                 }
761
762                 try {
763                     if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
764                         cont = false;
765                     }
766
767                 } catch (InterruptedException e) {
768                     logger.error("startThread exception: ", e);
769                     Thread.currentThread().interrupt();
770                     cont = false;
771                 }
772             }
773
774             logger.info("PersistentThreadModel completed");
775         }
776     }
777
778     /* ============================================================ */
779
780     /** DataSource-EntityManagerFactory pair. */
781     private class DsEmf {
782         private BasicDataSource bds;
783         private EntityManagerFactory emf;
784
785         /**
786          * Makes an entity manager factory for the given data source.
787          *
788          * @param bds pooled data source
789          */
790         public DsEmf(BasicDataSource bds) {
791             try {
792                 Map<String, Object> props = new HashMap<>();
793                 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
794
795                 this.bds = bds;
796                 this.emf = makeEntMgrFact(props);
797
798             } catch (RuntimeException e) {
799                 closeDataSource();
800                 throw e;
801             }
802         }
803
804         /** Closes the entity manager factory and the data source. */
805         public void close() {
806             try {
807                 emf.close();
808
809             } catch (RuntimeException e) {
810                 closeDataSource();
811                 throw e;
812             }
813
814             closeDataSource();
815         }
816
817         /** Closes the data source only. */
818         private void closeDataSource() {
819             try {
820                 bds.close();
821
822             } catch (SQLException e) {
823                 throw new PersistenceFeatureException(e);
824             }
825         }
826     }
827
828     private static class SingletonRegistry {
829         private static final TransactionSynchronizationRegistry transreg =
830                 new com.arjuna.ats.internal.jta.transaction.arjunacore
831                 .TransactionSynchronizationRegistryImple();
832
833         private SingletonRegistry() {
834             super();
835         }
836     }
837
838     /** Factory for various items. Methods can be overridden for junit testing. */
839
840     /**
841      * Gets the transaction manager.
842      *
843      * @return the transaction manager
844      */
845     protected TransactionManager getTransMgr() {
846         return com.arjuna.ats.jta.TransactionManager.transactionManager();
847     }
848
849     /**
850      * Gets the user transaction.
851      *
852      * @return the user transaction
853      */
854     protected UserTransaction getUserTrans() {
855         return com.arjuna.ats.jta.UserTransaction.userTransaction();
856     }
857
858     /**
859      * Gets the transaction synchronization registry.
860      *
861      * @return the transaction synchronization registry
862      */
863     protected TransactionSynchronizationRegistry getTransSyncReg() {
864         return SingletonRegistry.transreg;
865     }
866
867     /**
868      * Gets the KIE services.
869      *
870      * @return the KIE services
871      */
872     protected KieServices getKieServices() {
873         return KieServices.Factory.get();
874     }
875
876     /**
877      * Loads properties from a file.
878      *
879      * @param filenm name of the file to load
880      * @return properties, as loaded from the file
881      * @throws IOException if an error occurs reading from the file
882      */
883     protected Properties loadProperties(String filenm) throws IOException {
884         return PropertyUtil.getProperties(filenm);
885     }
886
887     /**
888      * Makes a Data Source.
889      *
890      * @param dsProps data source properties
891      * @return a new data source
892      */
893     protected BasicDataSource makeDataSource(Properties dsProps) {
894         try {
895             return BasicDataSourceFactory.createDataSource(dsProps);
896
897         } catch (Exception e) {
898             throw new PersistenceFeatureException(e);
899         }
900     }
901
902     /**
903      * Makes a new JPA connector for drools sessions.
904      *
905      * @param emf entity manager factory
906      * @return a new JPA connector for drools sessions
907      */
908     protected DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
909         return new JpaDroolsSessionConnector(emf);
910     }
911
912     /**
913      * Makes a new entity manager factory.
914      *
915      * @param props properties with which the factory should be configured
916      * @return a new entity manager factory
917      */
918     protected EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
919         return Persistence.createEntityManagerFactory("onapsessionsPU", props);
920     }
921
922     /**
923      * Gets the policy controller associated with a given policy container.
924      *
925      * @param container container whose controller is to be retrieved
926      * @return the container's controller
927      */
928     protected PolicyController getPolicyController(PolicyContainer container) {
929         return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
930     }
931
932     /**
933      * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
934      * particularly when they are not, themselves, Runtime exceptions.
935      */
936     public static class PersistenceFeatureException extends RuntimeException {
937         private static final long serialVersionUID = 1L;
938
939         /** 
940          * Constructor.
941          * */
942         public PersistenceFeatureException(Exception ex) {
943             super(ex);
944         }
945     }
946 }