a4965888adde9f94545bf209629fe39e249cb68f
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.hibernate.cfg.AvailableSettings;
42 import org.kie.api.KieServices;
43 import org.kie.api.runtime.Environment;
44 import org.kie.api.runtime.EnvironmentName;
45 import org.kie.api.runtime.KieSession;
46 import org.kie.api.runtime.KieSessionConfiguration;
47 import org.onap.policy.drools.core.PolicyContainer;
48 import org.onap.policy.drools.core.PolicySession;
49 import org.onap.policy.drools.core.PolicySessionFeatureApi;
50 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
51 import org.onap.policy.drools.system.PolicyController;
52 import org.onap.policy.drools.system.PolicyControllerConstants;
53 import org.onap.policy.drools.system.PolicyEngine;
54 import org.onap.policy.drools.utils.PropertyUtil;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * If this feature is supported, there is a single instance of it. It adds persistence to Drools
60  * sessions. In addition, if an active-standby feature exists, then that is used to determine the
61  * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
62  * id.
63  *
64  * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
65  * was moved here as part of making this a separate optional feature.
66  */
67 public class PersistenceFeature implements PolicySessionFeatureApi, PolicyEngineFeatureApi {
68
69     private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
70
71     /** KieService factory. */
72     private KieServices kieSvcFact;
73
74     /** Persistence properties. */
75     private Properties persistProps;
76
77     /** Whether or not the SessionInfo records should be cleaned out. */
78     private boolean sessInfoCleaned;
79
80     /** SessionInfo timeout, in milli-seconds, as read from
81      * {@link #persistProps}. */
82     private long sessionInfoTimeoutMs;
83
84     /** Object used to serialize cleanup of sessioninfo table. */
85     private Object cleanupLock = new Object();
86
87     /**
88      * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
89      * not found, create one.
90      *
91      * @param policyContainer the container whose adjunct we are looking up, and possibly creating
92      * @return the associated 'ContainerAdjunct' instance, which may be new
93      */
94     private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
95
96         Object rval = policyContainer.getAdjunct(this);
97
98         if (!(rval instanceof ContainerAdjunct)) {
99             // adjunct does not exist, or has the wrong type (should never
100             // happen)
101             rval = new ContainerAdjunct(policyContainer);
102             policyContainer.setAdjunct(this, rval);
103         }
104
105         return (ContainerAdjunct) rval;
106     }
107
108     /**
109      * {@inheritDoc}.
110      **/
111     @Override
112     public int getSequenceNumber() {
113         return 1;
114     }
115
116     /**
117      * {@inheritDoc}.
118      **/
119     @Override
120     public void globalInit(String[] args, String configDir) {
121
122         kieSvcFact = getKieServices();
123
124         try {
125             persistProps = loadProperties(configDir + "/feature-session-persistence.properties");
126
127         } catch (IOException e1) {
128             logger.error("initializePersistence: ", e1);
129         }
130
131         sessionInfoTimeoutMs = getPersistenceTimeout();
132     }
133
134     /**
135      * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
136      * does not exist yet.
137      */
138     @Override
139     public KieSession activatePolicySession(
140             PolicyContainer policyContainer, String name, String kieBaseName) {
141
142         if (isPersistenceEnabled(policyContainer, name)) {
143             cleanUpSessionInfo();
144
145             return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
146         }
147
148         return null;
149     }
150
151     /**
152      * {@inheritDoc}.
153      **/
154     @Override
155     public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
156
157         PolicyContainer policyContainer = session.getPolicyContainer();
158         if (isPersistenceEnabled(policyContainer, session.getName())) {
159             return new PersistentThreadModel(session, getProperties(policyContainer));
160         }
161         return null;
162     }
163
164     /**
165      * {@inheritDoc}.
166      **/
167     @Override
168     public void disposeKieSession(PolicySession policySession) {
169
170         ContainerAdjunct contAdj =
171                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
172         if (contAdj != null) {
173             contAdj.disposeKieSession(policySession.getName());
174         }
175     }
176
177     /**
178      * {@inheritDoc}.
179      **/
180     @Override
181     public void destroyKieSession(PolicySession policySession) {
182
183         ContainerAdjunct contAdj =
184                 (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
185         if (contAdj != null) {
186             contAdj.destroyKieSession(policySession.getName());
187         }
188     }
189
190     /**
191      * {@inheritDoc}.
192      **/
193     @Override
194     public boolean afterStart(PolicyEngine engine) {
195         return false;
196     }
197
198     /**
199      * {@inheritDoc}.
200      **/
201     @Override
202     public boolean beforeStart(PolicyEngine engine) {
203         return cleanup();
204     }
205
206     /**
207      * {@inheritDoc}.
208      **/
209     @Override
210     public boolean beforeActivate(PolicyEngine engine) {
211         return cleanup();
212     }
213
214     private boolean cleanup() {
215         synchronized (cleanupLock) {
216             sessInfoCleaned = false;
217         }
218
219         return false;
220     }
221
222     /**
223      * {@inheritDoc}.
224      **/
225     @Override
226     public boolean afterActivate(PolicyEngine engine) {
227         return false;
228     }
229
230     /* ============================================================ */
231
232     /**
233      * Gets the persistence timeout value for sessioninfo records.
234      *
235      * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
236      */
237     private long getPersistenceTimeout() {
238         String timeoutString = null;
239
240         try {
241             timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
242
243             if (timeoutString != null) {
244                 // timeout parameter is specified
245                 return Long.valueOf(timeoutString) * 1000;
246             }
247
248         } catch (NumberFormatException e) {
249             logger.error(
250                     "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
251                     timeoutString,
252                     e);
253         }
254
255         return -1;
256     }
257
258     /* ============================================================ */
259
260     /**
261      * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
262      * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
263      * garbage-collected with the container.
264      */
265     protected class ContainerAdjunct {
266         /** 'PolicyContainer' instance that this adjunct is extending. */
267         private PolicyContainer policyContainer;
268
269         /** Maps a KIE session name to its data source. */
270         private Map<String, DsEmf> name2ds = new HashMap<>();
271
272         /**
273          * Constructor - initialize a new 'ContainerAdjunct'.
274          *
275          * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
276          */
277         private ContainerAdjunct(PolicyContainer policyContainer) {
278             this.policyContainer = policyContainer;
279         }
280
281         /**
282          * Create a new persistent KieSession. If there is already a corresponding entry in the
283          * database, it is used to initialize the KieSession. If not, a completely new session is
284          * created.
285          *
286          * @param name the name of the KieSession (which is also the name of the associated
287          *     PolicySession)
288          * @param kieBaseName the name of the 'KieBase' instance containing this session
289          * @return a new KieSession with persistence enabled
290          */
291         private KieSession newPersistentKieSession(String name, String kieBaseName) {
292
293             configureSysProps();
294
295             BasicDataSource ds = makeDataSource(getDataSourceProperties());
296             DsEmf dsemf = new DsEmf(ds);
297
298             try {
299                 EntityManagerFactory emf = dsemf.emf;
300                 DroolsSessionConnector conn = makeJpaConnector(emf);
301
302                 long desiredSessionId = getSessionId(conn, name);
303
304                 logger.info(
305                         "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
306
307                 // session does not exist -- attempt to create one
308                 logger.info(
309                         "getPolicySession:session does not exist -- attempt to create one with name {}", name);
310
311                 Environment env = kieSvcFact.newEnvironment();
312
313                 configureKieEnv(env, emf);
314
315                 KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration();
316
317                 KieSession kieSession =
318                         (desiredSessionId >= 0
319                         ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
320                                 : null);
321
322                 if (kieSession == null) {
323                     // loadKieSession() returned null or desiredSessionId < 0
324                     logger.info(
325                             "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
326
327                     kieSession = newKieSession(kieBaseName, env);
328                 }
329
330                 replaceSession(conn, name, kieSession);
331
332                 name2ds.put(name, dsemf);
333
334                 return kieSession;
335
336             } catch (RuntimeException e) {
337                 dsemf.close();
338                 throw e;
339             }
340         }
341
342         /**
343          * Loads an existing KieSession from the persistent store.
344          *
345          * @param kieBaseName the name of the 'KieBase' instance containing this session
346          * @param desiredSessionId id of the desired KieSession
347          * @param env Kie Environment for the session
348          * @param kConf Kie Configuration for the session
349          * @return the persistent session, or {@code null} if it could not be loaded
350          */
351         private KieSession loadKieSession(
352                 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
353             try {
354                 KieSession kieSession =
355                         kieSvcFact
356                         .getStoreServices()
357                         .loadKieSession(
358                                 desiredSessionId,
359                                 policyContainer.getKieContainer().getKieBase(kieBaseName),
360                                 kieConf,
361                                 env);
362
363                 logger.info("LOADING Loaded session {}", desiredSessionId);
364
365                 return kieSession;
366
367             } catch (Exception e) {
368                 logger.error("loadKieSession error: ", e);
369                 return null;
370             }
371         }
372
373         /**
374          * Creates a new, persistent KieSession.
375          *
376          * @param kieBaseName the name of the 'KieBase' instance containing this session
377          * @param env Kie Environment for the session
378          * @return a new, persistent session
379          */
380         private KieSession newKieSession(String kieBaseName, Environment env) {
381             KieSession kieSession =
382                     kieSvcFact
383                     .getStoreServices()
384                     .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
385
386             logger.info("LOADING CREATED {}", kieSession.getIdentifier());
387
388             return kieSession;
389         }
390
391         /**
392          * Closes the data source associated with a session.
393          *
394          * @param name name of the session being destroyed
395          */
396         private void destroyKieSession(String name) {
397             closeDataSource(name);
398         }
399
400         /**
401          * Closes the data source associated with a session.
402          *
403          * @param name name of the session being disposed of
404          */
405         private void disposeKieSession(String name) {
406             closeDataSource(name);
407         }
408
409         /**
410          * Closes the data source associated with a session.
411          *
412          * @param name name of the session whose data source is to be closed
413          */
414         private void closeDataSource(String name) {
415             DsEmf ds = name2ds.remove(name);
416             if (ds != null) {
417                 ds.close();
418             }
419         }
420
421         /** Configures java system properties for JPA/JTA. */
422         private void configureSysProps() {
423             System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
424             System.setProperty(
425                     "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
426                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
427             System.setProperty(
428                     "ObjectStoreEnvironmentBean.objectStoreDir",
429                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
430         }
431
432         /**
433          * Configures a Kie Environment.
434          *
435          * @param env environment to be configured
436          * @param emf entity manager factory
437          */
438         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
439             env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
440             env.set(EnvironmentName.TRANSACTION, getUserTrans());
441             env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, getTransSyncReg());
442             env.set(EnvironmentName.TRANSACTION_MANAGER, getTransMgr());
443         }
444
445         /**
446          * Gets a session's ID from the persistent store.
447          *
448          * @param conn persistence connector
449          * @param sessnm name of the session
450          * @return the session's id, or {@code -1} if the session is not found
451          */
452         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
453             DroolsSession sess = conn.get(sessnm);
454             return sess != null ? sess.getSessionId() : -1;
455         }
456
457         /**
458          * Replaces a session within the persistent store, if it exists. Adds it otherwise.
459          *
460          * @param conn persistence connector
461          * @param sessnm name of session to be updated
462          * @param kieSession new session information
463          */
464         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
465
466             DroolsSessionEntity sess = new DroolsSessionEntity();
467
468             sess.setSessionName(sessnm);
469             sess.setSessionId(kieSession.getIdentifier());
470
471             conn.replace(sess);
472         }
473     }
474
475     /* ============================================================ */
476
477     /**
478      * Gets the data source properties.
479      *
480      * @return the data source properties
481      */
482     private Properties getDataSourceProperties() {
483         Properties props = new Properties();
484         props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
485         props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
486         props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
487         props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
488         props.put("maxActive", "3");
489         props.put("maxIdle", "1");
490         props.put("maxWait", "120000");
491         props.put("whenExhaustedAction", "2");
492         props.put("testOnBorrow", "false");
493         props.put("poolPreparedStatements", "true");
494
495         return props;
496     }
497
498     /**
499      * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
500      * sessions. This also has the useful side-effect of removing abandoned records as well.
501      */
502     private void cleanUpSessionInfo() {
503
504         synchronized (cleanupLock) {
505             if (sessInfoCleaned) {
506                 logger.info("Clean up of sessioninfo table: already done");
507                 return;
508             }
509
510             if (sessionInfoTimeoutMs < 0) {
511                 logger.info("Clean up of sessioninfo table: no timeout specified");
512                 return;
513             }
514
515             // now do the record deletion
516             try (BasicDataSource ds = makeDataSource(getDataSourceProperties());
517                     Connection connection = ds.getConnection();
518                     PreparedStatement statement =
519                             connection.prepareStatement(
520                                 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
521
522                 connection.setAutoCommit(true);
523
524                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
525
526                 int count = statement.executeUpdate();
527                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
528
529             } catch (SQLException e) {
530                 logger.error("Clean up of sessioninfo table failed", e);
531             }
532
533             // delete DroolsSessionEntity where sessionId not in (sessinfo.xxx)?
534
535             sessInfoCleaned = true;
536         }
537     }
538
539     /**
540      * Determine whether persistence is enabled for a specific container.
541      *
542      * @param container container to be checked
543      * @param sessionName name of the session to be checked
544      * @return {@code true} if persistence is enabled for this container, and {@code false} if not
545      */
546     private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
547         Properties properties = getProperties(container);
548         boolean rval = false;
549
550         if (properties != null) {
551             // fetch the 'type' property
552             String type = getProperty(properties, sessionName, "type");
553             rval = "auto".equals(type) || "native".equals(type);
554         }
555
556         return rval;
557     }
558
559     /**
560      * Determine the controller properties associated with the policy container.
561      *
562      * @param container container whose properties are to be retrieved
563      * @return the container's properties, or {@code null} if not found
564      */
565     private Properties getProperties(PolicyContainer container) {
566         try {
567             return getPolicyController(container).getProperties();
568         } catch (IllegalArgumentException e) {
569             logger.error("getProperties exception: ", e);
570             return null;
571         }
572     }
573
574     /**
575      * Fetch the persistence property associated with a session. The name may have the form:
576      *
577      * <ul>
578      *   <li>persistence.SESSION-NAME.PROPERTY
579      *   <li>persistence.PROPERTY
580      * </ul>
581      *
582      * @param properties properties from which the value is to be retrieved
583      * @param sessionName session name of interest
584      * @param property property name of interest
585      * @return the property value, or {@code null} if not found
586      */
587     private String getProperty(Properties properties, String sessionName, String property) {
588         String value = properties.getProperty("persistence." + sessionName + "." + property);
589         if (value == null) {
590             value = properties.getProperty("persistence." + property);
591         }
592
593         return value;
594     }
595
596     /* ============================================================ */
597
598     /**
599      * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
600      * 'fireUntilHalt' method isn't compatible with persistence.
601      */
602     public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
603
604         /** Session associated with this persistent thread. */
605         private final PolicySession session;
606
607         /** The session thread. */
608         private final Thread thread;
609
610         /** Used to indicate that processing should stop. */
611         private final CountDownLatch stopped = new CountDownLatch(1);
612
613         /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
614         long minSleepTime = 100;
615
616         /**
617          * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
618          * is a "half" time, so that we can multiply it by two without overflowing the word size.
619          */
620         long halfMaxSleepTime = 5000L / 2L;
621
622         /**
623          * Constructor - initialize variables and create thread.
624          *
625          * @param session the 'PolicySession' instance
626          * @param properties may contain additional session properties
627          */
628         public PersistentThreadModel(PolicySession session, Properties properties) {
629             this.session = session;
630             this.thread = new Thread(this, getThreadName());
631
632             if (properties == null) {
633                 return;
634             }
635
636             // extract 'minSleepTime' and/or 'maxSleepTime'
637             String name = session.getName();
638
639             // fetch 'minSleepTime' value, and update if defined
640             String sleepTimeString = getProperty(properties, name, "minSleepTime");
641             if (sleepTimeString != null) {
642                 try {
643                     minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
644                 } catch (Exception e) {
645                     logger.error("{}: Illegal value for 'minSleepTime'", sleepTimeString, e);
646                 }
647             }
648
649             // fetch 'maxSleepTime' value, and update if defined
650             long maxSleepTime = 2 * halfMaxSleepTime;
651             sleepTimeString = getProperty(properties, name, "maxSleepTime");
652             if (sleepTimeString != null) {
653                 try {
654                     maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
655                 } catch (Exception e) {
656                     logger.error("{}: Illegal value for 'maxSleepTime'", sleepTimeString, e);
657                 }
658             }
659
660             // swap values if needed
661             if (minSleepTime > maxSleepTime) {
662                 logger.error("minSleepTime({}) is greater than maxSleepTime({}) -- swapping", minSleepTime,
663                                 maxSleepTime);
664                 long tmp = minSleepTime;
665                 minSleepTime = maxSleepTime;
666                 maxSleepTime = tmp;
667             }
668
669             halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
670         }
671
672         /**
673          * Get thread name.
674          *
675          * @return the String to use as the thread name */
676         private String getThreadName() {
677             return "Session " + session.getFullName() + " (persistent)";
678         }
679
680         /*=========================*/
681         /* 'ThreadModel' interface */
682         /*=========================*/
683
684         /**
685          * {@inheritDoc}.
686          **/
687         @Override
688         public void start() {
689             thread.start();
690         }
691
692         /**
693          * {@inheritDoc}.
694          **/
695         @Override
696         public void stop() {
697             // tell the thread to stop
698             stopped.countDown();
699
700             // wait up to 10 seconds for the thread to stop
701             try {
702                 thread.join(10000);
703
704             } catch (InterruptedException e) {
705                 logger.error("stopThread exception: ", e);
706                 Thread.currentThread().interrupt();
707             }
708
709             // verify that it's done
710             if (thread.isAlive()) {
711                 logger.error("stopThread: still running");
712             }
713         }
714
715         /**
716          * {@inheritDoc}.
717          **/
718         @Override
719         public void updated() {
720             // the container artifact has been updated -- adjust the thread name
721             thread.setName(getThreadName());
722         }
723
724         /*======================*/
725         /* 'Runnable' interface */
726         /*======================*/
727
728         /**
729          * {@inheritDoc}.
730          **/
731         @Override
732         public void run() {
733             logger.info("PersistentThreadModel running");
734
735             // set thread local variable
736             session.setPolicySession();
737
738             KieSession kieSession = session.getKieSession();
739             long sleepTime = 2 * halfMaxSleepTime;
740
741             // We want to continue, despite any exceptions that occur
742             // while rules are fired.
743
744             boolean cont = true;
745             while (cont) {
746
747                 try {
748                     if (kieSession.fireAllRules() > 0) {
749                         // some rules fired -- reduce poll delay
750                         sleepTime = Math.max(minSleepTime, sleepTime / 2);
751                     } else {
752                         // no rules fired -- increase poll delay
753                         sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
754                     }
755
756                 } catch (Exception | LinkageError e) {
757                     logger.error("Exception during kieSession.fireAllRules", e);
758                 }
759
760                 try {
761                     if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
762                         cont = false;
763                     }
764
765                 } catch (InterruptedException e) {
766                     logger.error("startThread exception: ", e);
767                     Thread.currentThread().interrupt();
768                     cont = false;
769                 }
770             }
771
772             logger.info("PersistentThreadModel completed");
773         }
774     }
775
776     /* ============================================================ */
777
778     /** DataSource-EntityManagerFactory pair. */
779     private class DsEmf {
780         private BasicDataSource bds;
781         private EntityManagerFactory emf;
782
783         /**
784          * Makes an entity manager factory for the given data source.
785          *
786          * @param bds pooled data source
787          */
788         public DsEmf(BasicDataSource bds) {
789             try {
790                 Map<String, Object> props = new HashMap<>();
791                 props.put(AvailableSettings.JPA_JTA_DATASOURCE, bds);
792
793                 this.bds = bds;
794                 this.emf = makeEntMgrFact(props);
795
796             } catch (RuntimeException e) {
797                 closeDataSource();
798                 throw e;
799             }
800         }
801
802         /** Closes the entity manager factory and the data source. */
803         public void close() {
804             try {
805                 emf.close();
806
807             } catch (RuntimeException e) {
808                 closeDataSource();
809                 throw e;
810             }
811
812             closeDataSource();
813         }
814
815         /** Closes the data source only. */
816         private void closeDataSource() {
817             try {
818                 bds.close();
819
820             } catch (SQLException e) {
821                 throw new PersistenceFeatureException(e);
822             }
823         }
824     }
825
826     private static class SingletonRegistry {
827         private static final TransactionSynchronizationRegistry transreg =
828                 new com.arjuna.ats.internal.jta.transaction.arjunacore
829                 .TransactionSynchronizationRegistryImple();
830
831         private SingletonRegistry() {
832             super();
833         }
834     }
835
836     /** Factory for various items. Methods can be overridden for junit testing. */
837
838     /**
839      * Gets the transaction manager.
840      *
841      * @return the transaction manager
842      */
843     protected TransactionManager getTransMgr() {
844         return com.arjuna.ats.jta.TransactionManager.transactionManager();
845     }
846
847     /**
848      * Gets the user transaction.
849      *
850      * @return the user transaction
851      */
852     protected UserTransaction getUserTrans() {
853         return com.arjuna.ats.jta.UserTransaction.userTransaction();
854     }
855
856     /**
857      * Gets the transaction synchronization registry.
858      *
859      * @return the transaction synchronization registry
860      */
861     protected TransactionSynchronizationRegistry getTransSyncReg() {
862         return SingletonRegistry.transreg;
863     }
864
865     /**
866      * Gets the KIE services.
867      *
868      * @return the KIE services
869      */
870     protected KieServices getKieServices() {
871         return KieServices.Factory.get();
872     }
873
874     /**
875      * Loads properties from a file.
876      *
877      * @param filenm name of the file to load
878      * @return properties, as loaded from the file
879      * @throws IOException if an error occurs reading from the file
880      */
881     protected Properties loadProperties(String filenm) throws IOException {
882         return PropertyUtil.getProperties(filenm);
883     }
884
885     /**
886      * Makes a Data Source.
887      *
888      * @param dsProps data source properties
889      * @return a new data source
890      */
891     protected BasicDataSource makeDataSource(Properties dsProps) {
892         try {
893             return BasicDataSourceFactory.createDataSource(dsProps);
894
895         } catch (Exception e) {
896             throw new PersistenceFeatureException(e);
897         }
898     }
899
900     /**
901      * Makes a new JPA connector for drools sessions.
902      *
903      * @param emf entity manager factory
904      * @return a new JPA connector for drools sessions
905      */
906     protected DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
907         return new JpaDroolsSessionConnector(emf);
908     }
909
910     /**
911      * Makes a new entity manager factory.
912      *
913      * @param props properties with which the factory should be configured
914      * @return a new entity manager factory
915      */
916     protected EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
917         return Persistence.createEntityManagerFactory("onapsessionsPU", props);
918     }
919
920     /**
921      * Gets the policy controller associated with a given policy container.
922      *
923      * @param container container whose controller is to be retrieved
924      * @return the container's controller
925      */
926     protected PolicyController getPolicyController(PolicyContainer container) {
927         return PolicyControllerConstants.getFactory().get(container.getGroupId(), container.getArtifactId());
928     }
929
930     /**
931      * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
932      * particularly when they are not, themselves, Runtime exceptions.
933      */
934     public static class PersistenceFeatureException extends RuntimeException {
935         private static final long serialVersionUID = 1L;
936
937         /**
938          * Constructor.
939          * */
940         public PersistenceFeatureException(Exception ex) {
941             super(ex);
942         }
943     }
944 }