a929c7120f71808ffdfe55531e122a1652cc326f
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.PreparedStatement;
25 import java.sql.SQLException;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Properties;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import javax.persistence.EntityManagerFactory;
32 import javax.persistence.Persistence;
33 import javax.transaction.TransactionManager;
34 import javax.transaction.TransactionSynchronizationRegistry;
35 import javax.transaction.UserTransaction;
36 import org.apache.commons.dbcp2.BasicDataSource;
37 import org.apache.commons.dbcp2.BasicDataSourceFactory;
38 import org.hibernate.cfg.AvailableSettings;
39 import org.kie.api.KieServices;
40 import org.kie.api.runtime.Environment;
41 import org.kie.api.runtime.EnvironmentName;
42 import org.kie.api.runtime.KieSession;
43 import org.kie.api.runtime.KieSessionConfiguration;
44 import org.onap.policy.drools.core.PolicyContainer;
45 import org.onap.policy.drools.core.PolicySession;
46 import org.onap.policy.drools.core.PolicySessionFeatureApi;
47 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
48 import org.onap.policy.drools.system.PolicyController;
49 import org.onap.policy.drools.system.PolicyControllerConstants;
50 import org.onap.policy.drools.system.PolicyEngine;
51 import org.onap.policy.drools.utils.PropertyUtil;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * If this feature is supported, there is a single instance of it. It adds persistence to Drools
57  * sessions. In addition, if an active-standby feature exists, then that is used to determine the
58  * active and last-active PDP. If it does not exist, then the current host name is used as the PDP
59  * id.
60  *
61  * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It
62  * was moved here as part of making this a separate optional feature.
63  */
64 public class PersistenceFeature implements PolicySessionFeatureApi, PolicyEngineFeatureApi {
65
66     private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
67
68     /** KieService factory. */
69     private KieServices kieSvcFact;
70
71     /** Persistence properties. */
72     private Properties persistProps;
73
74     /** Whether or not the SessionInfo records should be cleaned out. */
75     private boolean sessInfoCleaned;
76
77     /** SessionInfo timeout, in milli-seconds, as read from
78      * {@link #persistProps}. */
79     private long sessionInfoTimeoutMs;
80
81     /** Object used to serialize cleanup of sessioninfo table. */
82     private Object cleanupLock = new Object();
83
84     /**
85      * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If
86      * not found, create one.
87      *
88      * @param policyContainer the container whose adjunct we are looking up, and possibly creating
89      * @return the associated 'ContainerAdjunct' instance, which may be new
90      */
91     private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
92
93         Object rval = policyContainer.getAdjunct(this);
94
95         if (!(rval instanceof ContainerAdjunct)) {
96             // adjunct does not exist, or has the wrong type (should never
97             // happen)
98             rval = new ContainerAdjunct(policyContainer);
99             policyContainer.setAdjunct(this, rval);
100         }
101
102         return (ContainerAdjunct) rval;
103     }
104
105     /**
106      * {@inheritDoc}.
107      **/
108     @Override
109     public int getSequenceNumber() {
110         return 1;
111     }
112
113     /**
114      * {@inheritDoc}.
115      **/
116     @Override
117     public void globalInit(String[] args, String configDir) {
118
119         kieSvcFact = getKieServices();
120
121         try {
122             persistProps = loadProperties(configDir + "/feature-session-persistence.properties");
123
124         } catch (IOException e1) {
125             logger.error("initializePersistence: ", e1);
126         }
127
128         sessionInfoTimeoutMs = getPersistenceTimeout();
129     }
130
131     /**
132      * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it
133      * does not exist yet.
134      */
135     @Override
136     public KieSession activatePolicySession(
137             PolicyContainer policyContainer, String name, String kieBaseName) {
138
139         if (isPersistenceEnabled(policyContainer, name)) {
140             cleanUpSessionInfo();
141
142             return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
143         }
144
145         return null;
146     }
147
148     /**
149      * {@inheritDoc}.
150      **/
151     @Override
152     public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
153
154         var policyContainer = session.getContainer();
155         if (isPersistenceEnabled(policyContainer, session.getName())) {
156             return new PersistentThreadModel(session, getProperties(policyContainer));
157         }
158         return null;
159     }
160
161     /**
162      * {@inheritDoc}.
163      **/
164     @Override
165     public void disposeKieSession(PolicySession policySession) {
166
167         ContainerAdjunct contAdj =
168                 (ContainerAdjunct) policySession.getContainer().getAdjunct(this);
169         if (contAdj != null) {
170             contAdj.disposeKieSession(policySession.getName());
171         }
172     }
173
174     /**
175      * {@inheritDoc}.
176      **/
177     @Override
178     public void destroyKieSession(PolicySession policySession) {
179
180         ContainerAdjunct contAdj =
181                 (ContainerAdjunct) policySession.getContainer().getAdjunct(this);
182         if (contAdj != null) {
183             contAdj.destroyKieSession(policySession.getName());
184         }
185     }
186
187     /**
188      * {@inheritDoc}.
189      **/
190     @Override
191     public boolean afterStart(PolicyEngine engine) {
192         return false;
193     }
194
195     /**
196      * {@inheritDoc}.
197      **/
198     @Override
199     public boolean beforeStart(PolicyEngine engine) {
200         return cleanup();
201     }
202
203     /**
204      * {@inheritDoc}.
205      **/
206     @Override
207     public boolean beforeActivate(PolicyEngine engine) {
208         return cleanup();
209     }
210
211     private boolean cleanup() {
212         synchronized (cleanupLock) {
213             sessInfoCleaned = false;
214         }
215
216         return false;
217     }
218
219     /**
220      * {@inheritDoc}.
221      **/
222     @Override
223     public boolean afterActivate(PolicyEngine engine) {
224         return false;
225     }
226
227     /* ============================================================ */
228
229     /**
230      * Gets the persistence timeout value for sessioninfo records.
231      *
232      * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid
233      */
234     private long getPersistenceTimeout() {
235         String timeoutString = null;
236
237         try {
238             timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
239
240             if (timeoutString != null) {
241                 // timeout parameter is specified
242                 return Long.valueOf(timeoutString) * 1000;
243             }
244
245         } catch (NumberFormatException e) {
246             logger.error(
247                     "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
248                     timeoutString,
249                     e);
250         }
251
252         return -1;
253     }
254
255     /* ============================================================ */
256
257     /**
258      * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its
259      * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be
260      * garbage-collected with the container.
261      */
262     protected class ContainerAdjunct {
263         /** 'PolicyContainer' instance that this adjunct is extending. */
264         private PolicyContainer policyContainer;
265
266         /** Maps a KIE session name to its data source. */
267         private Map<String, DsEmf> name2ds = new HashMap<>();
268
269         /**
270          * Constructor - initialize a new 'ContainerAdjunct'.
271          *
272          * @param policyContainer the 'PolicyContainer' instance this adjunct is extending
273          */
274         private ContainerAdjunct(PolicyContainer policyContainer) {
275             this.policyContainer = policyContainer;
276         }
277
278         /**
279          * Create a new persistent KieSession. If there is already a corresponding entry in the
280          * database, it is used to initialize the KieSession. If not, a completely new session is
281          * created.
282          *
283          * @param name the name of the KieSession (which is also the name of the associated
284          *     PolicySession)
285          * @param kieBaseName the name of the 'KieBase' instance containing this session
286          * @return a new KieSession with persistence enabled
287          */
288         private KieSession newPersistentKieSession(String name, String kieBaseName) {
289
290             configureSysProps();
291
292             BasicDataSource ds = makeDataSource(getDataSourceProperties());
293             var dsemf = new DsEmf(ds);
294
295             try {
296                 EntityManagerFactory emf = dsemf.emf;
297                 DroolsSessionConnector conn = makeJpaConnector(emf);
298
299                 long desiredSessionId = getSessionId(conn, name);
300
301                 logger.info(
302                         "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
303
304                 // session does not exist -- attempt to create one
305                 logger.info(
306                         "getPolicySession:session does not exist -- attempt to create one with name {}", name);
307
308                 var env = kieSvcFact.newEnvironment();
309
310                 configureKieEnv(env, emf);
311
312                 var kieConf = kieSvcFact.newKieSessionConfiguration();
313
314                 KieSession kieSession =
315                         (desiredSessionId >= 0
316                         ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf)
317                                 : null);
318
319                 if (kieSession == null) {
320                     // loadKieSession() returned null or desiredSessionId < 0
321                     logger.info(
322                             "LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
323
324                     kieSession = newKieSession(kieBaseName, env);
325                 }
326
327                 replaceSession(conn, name, kieSession);
328
329                 name2ds.put(name, dsemf);
330
331                 return kieSession;
332
333             } catch (RuntimeException e) {
334                 dsemf.close();
335                 throw e;
336             }
337         }
338
339         /**
340          * Loads an existing KieSession from the persistent store.
341          *
342          * @param kieBaseName the name of the 'KieBase' instance containing this session
343          * @param desiredSessionId id of the desired KieSession
344          * @param env Kie Environment for the session
345          * @param kieConf Kie Configuration for the session
346          * @return the persistent session, or {@code null} if it could not be loaded
347          */
348         private KieSession loadKieSession(
349                 String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) {
350             try {
351                 var kieSession =
352                         kieSvcFact
353                         .getStoreServices()
354                         .loadKieSession(
355                                 desiredSessionId,
356                                 policyContainer.getKieContainer().getKieBase(kieBaseName),
357                                 kieConf,
358                                 env);
359
360                 logger.info("LOADING Loaded session {}", desiredSessionId);
361
362                 return kieSession;
363
364             } catch (Exception e) {
365                 logger.error("loadKieSession error: ", e);
366                 return null;
367             }
368         }
369
370         /**
371          * Creates a new, persistent KieSession.
372          *
373          * @param kieBaseName the name of the 'KieBase' instance containing this session
374          * @param env Kie Environment for the session
375          * @return a new, persistent session
376          */
377         private KieSession newKieSession(String kieBaseName, Environment env) {
378             var kieSession =
379                     kieSvcFact
380                     .getStoreServices()
381                     .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
382
383             logger.info("LOADING CREATED {}", kieSession.getIdentifier());
384
385             return kieSession;
386         }
387
388         /**
389          * Closes the data source associated with a session.
390          *
391          * @param name name of the session being destroyed
392          */
393         private void destroyKieSession(String name) {
394             closeDataSource(name);
395         }
396
397         /**
398          * Closes the data source associated with a session.
399          *
400          * @param name name of the session being disposed of
401          */
402         private void disposeKieSession(String name) {
403             closeDataSource(name);
404         }
405
406         /**
407          * Closes the data source associated with a session.
408          *
409          * @param name name of the session whose data source is to be closed
410          */
411         private void closeDataSource(String name) {
412             DsEmf ds = name2ds.remove(name);
413             if (ds != null) {
414                 ds.close();
415             }
416         }
417
418         /** Configures java system properties for JPA/JTA. */
419         private void configureSysProps() {
420             System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
421             System.setProperty(
422                     "com.arjuna.ats.arjuna.objectstore.objectStoreDir",
423                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
424             System.setProperty(
425                     "ObjectStoreEnvironmentBean.objectStoreDir",
426                     persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
427         }
428
429         /**
430          * Configures a Kie Environment.
431          *
432          * @param env environment to be configured
433          * @param emf entity manager factory
434          */
435         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
436             env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
437             env.set(EnvironmentName.TRANSACTION, getUserTrans());
438             env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, getTransSyncReg());
439             env.set(EnvironmentName.TRANSACTION_MANAGER, getTransMgr());
440         }
441
442         /**
443          * Gets a session's ID from the persistent store.
444          *
445          * @param conn persistence connector
446          * @param sessnm name of the session
447          * @return the session's id, or {@code -1} if the session is not found
448          */
449         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
450             DroolsSession sess = conn.get(sessnm);
451             return sess != null ? sess.getSessionId() : -1;
452         }
453
454         /**
455          * Replaces a session within the persistent store, if it exists. Adds it otherwise.
456          *
457          * @param conn persistence connector
458          * @param sessnm name of session to be updated
459          * @param kieSession new session information
460          */
461         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
462
463             var sess = new DroolsSessionEntity();
464
465             sess.setSessionName(sessnm);
466             sess.setSessionId(kieSession.getIdentifier());
467
468             conn.replace(sess);
469         }
470     }
471
472     /* ============================================================ */
473
474     /**
475      * Gets the data source properties.
476      *
477      * @return the data source properties
478      */
479     private Properties getDataSourceProperties() {
480         var props = new Properties();
481         props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
482         props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
483         props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
484         props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
485         props.put("maxActive", "3");
486         props.put("maxIdle", "1");
487         props.put("maxWait", "120000");
488         props.put("whenExhaustedAction", "2");
489         props.put("testOnBorrow", "false");
490         props.put("poolPreparedStatements", "true");
491
492         return props;
493     }
494
495     /**
496      * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools
497      * sessions. This also has the useful side-effect of removing abandoned records as well.
498      */
499     private void cleanUpSessionInfo() {
500
501         synchronized (cleanupLock) {
502             if (sessInfoCleaned) {
503                 logger.info("Clean up of sessioninfo table: already done");
504                 return;
505             }
506
507             if (sessionInfoTimeoutMs < 0) {
508                 logger.info("Clean up of sessioninfo table: no timeout specified");
509                 return;
510             }
511
512             // now do the record deletion
513             try (BasicDataSource ds = makeDataSource(getDataSourceProperties());
514                     var connection = ds.getConnection();
515                     PreparedStatement statement =
516                             connection.prepareStatement(
517                                 "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
518
519                 connection.setAutoCommit(true);
520
521                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
522
523                 int count = statement.executeUpdate();
524                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
525
526             } catch (SQLException e) {
527                 logger.error("Clean up of sessioninfo table failed", e);
528             }
529
530             // delete DroolsSessionEntity where sessionId not in (sessinfo.xxx)?
531
532             sessInfoCleaned = true;
533         }
534     }
535
536     /**
537      * Determine whether persistence is enabled for a specific container.
538      *
539      * @param container container to be checked
540      * @param sessionName name of the session to be checked
541      * @return {@code true} if persistence is enabled for this container, and {@code false} if not
542      */
543     private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
544         var properties = getProperties(container);
545         var rval = false;
546
547         if (properties != null) {
548             // fetch the 'type' property
549             String type = getProperty(properties, sessionName, "type");
550             rval = "auto".equals(type) || "native".equals(type);
551         }
552
553         return rval;
554     }
555
556     /**
557      * Determine the controller properties associated with the policy container.
558      *
559      * @param container container whose properties are to be retrieved
560      * @return the container's properties, or {@code null} if not found
561      */
562     private Properties getProperties(PolicyContainer container) {
563         try {
564             return getPolicyController(container).getProperties();
565         } catch (IllegalArgumentException e) {
566             logger.error("getProperties exception: ", e);
567             return null;
568         }
569     }
570
571     /**
572      * Fetch the persistence property associated with a session. The name may have the form:
573      *
574      * <ul>
575      *   <li>persistence.SESSION-NAME.PROPERTY
576      *   <li>persistence.PROPERTY
577      * </ul>
578      *
579      * @param properties properties from which the value is to be retrieved
580      * @param sessionName session name of interest
581      * @param property property name of interest
582      * @return the property value, or {@code null} if not found
583      */
584     private String getProperty(Properties properties, String sessionName, String property) {
585         String value = properties.getProperty("persistence." + sessionName + "." + property);
586         if (value == null) {
587             value = properties.getProperty("persistence." + property);
588         }
589
590         return value;
591     }
592
593     /* ============================================================ */
594
595     /**
596      * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the
597      * 'fireUntilHalt' method isn't compatible with persistence.
598      */
599     public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
600
601         /** Session associated with this persistent thread. */
602         private final PolicySession session;
603
604         /** The session thread. */
605         private final Thread thread;
606
607         /** Used to indicate that processing should stop. */
608         private final CountDownLatch stopped = new CountDownLatch(1);
609
610         /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */
611         long minSleepTime = 100;
612
613         /**
614          * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This
615          * is a "half" time, so that we can multiply it by two without overflowing the word size.
616          */
617         long halfMaxSleepTime = 5000L / 2L;
618
619         /**
620          * Constructor - initialize variables and create thread.
621          *
622          * @param session the 'PolicySession' instance
623          * @param properties may contain additional session properties
624          */
625         public PersistentThreadModel(PolicySession session, Properties properties) {
626             this.session = session;
627             this.thread = new Thread(this, getThreadName());
628
629             if (properties == null) {
630                 return;
631             }
632
633             // extract 'minSleepTime' and/or 'maxSleepTime'
634             String name = session.getName();
635
636             // fetch 'minSleepTime' value, and update if defined
637             var sleepTimeString = getProperty(properties, name, "minSleepTime");
638             if (sleepTimeString != null) {
639                 try {
640                     minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
641                 } catch (Exception e) {
642                     logger.error("{}: Illegal value for 'minSleepTime'", sleepTimeString, e);
643                 }
644             }
645
646             // fetch 'maxSleepTime' value, and update if defined
647             long maxSleepTime = 2 * halfMaxSleepTime;
648             sleepTimeString = getProperty(properties, name, "maxSleepTime");
649             if (sleepTimeString != null) {
650                 try {
651                     maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
652                 } catch (Exception e) {
653                     logger.error("{}: Illegal value for 'maxSleepTime'", sleepTimeString, e);
654                 }
655             }
656
657             // swap values if needed
658             if (minSleepTime > maxSleepTime) {
659                 logger.error("minSleepTime({}) is greater than maxSleepTime({}) -- swapping", minSleepTime,
660                                 maxSleepTime);
661                 long tmp = minSleepTime;
662                 minSleepTime = maxSleepTime;
663                 maxSleepTime = tmp;
664             }
665
666             halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
667         }
668
669         /**
670          * Get thread name.
671          *
672          * @return the String to use as the thread name */
673         private String getThreadName() {
674             return "Session " + session.getFullName() + " (persistent)";
675         }
676
677         /*=========================*/
678         /* 'ThreadModel' interface */
679         /*=========================*/
680
681         /**
682          * {@inheritDoc}.
683          **/
684         @Override
685         public void start() {
686             thread.start();
687         }
688
689         /**
690          * {@inheritDoc}.
691          **/
692         @Override
693         public void stop() {
694             // tell the thread to stop
695             stopped.countDown();
696
697             // wait up to 10 seconds for the thread to stop
698             try {
699                 thread.join(10000);
700
701             } catch (InterruptedException e) {
702                 logger.error("stopThread exception: ", e);
703                 Thread.currentThread().interrupt();
704             }
705
706             // verify that it's done
707             if (thread.isAlive()) {
708                 logger.error("stopThread: still running");
709             }
710         }
711
712         /**
713          * {@inheritDoc}.
714          **/
715         @Override
716         public void updated() {
717             // the container artifact has been updated -- adjust the thread name
718             thread.setName(getThreadName());
719         }
720
721         /*======================*/
722         /* 'Runnable' interface */
723         /*======================*/
724
725         /**
726          * {@inheritDoc}.
727          **/
728         @Override
729         public void run() {
730             logger.info("PersistentThreadModel running");
731
732             // set thread local variable
733             session.setPolicySession();
734
735             var kieSession = session.getKieSession();
736             long sleepTime = 2 * halfMaxSleepTime;
737
738             // We want to continue, despite any exceptions that occur
739             // while rules are fired.
740
741             var cont = true;
742             while (cont) {
743
744                 try {
745                     if (kieSession.fireAllRules() > 0) {
746                         // some rules fired -- reduce poll delay
747                         sleepTime = Math.max(minSleepTime, sleepTime / 2);
748                     } else {
749                         // no rules fired -- increase poll delay
750                         sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
751                     }
752
753                 } catch (Exception | LinkageError e) {
754                     logger.error("Exception during kieSession.fireAllRules", e);
755                 }
756
757                 try {
758                     if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
759                         cont = false;
760                     }
761
762                 } catch (InterruptedException e) {
763                     logger.error("startThread exception: ", e);
764                     Thread.currentThread().interrupt();
765                     cont = false;
766                 }
767             }
768
769             session.removePolicySession();
770             logger.info("PersistentThreadModel completed");
771         }
772     }
773
774     /* ============================================================ */
775
776     /** DataSource-EntityManagerFactory pair. */
777     private class DsEmf {
778         private BasicDataSource bds;
779         private EntityManagerFactory emf;
780
781         /**
782          * Makes an entity manager factory for the given data source.
783          *
784          * @param bds pooled data source
785          */
786         public DsEmf(BasicDataSource bds) {
787             try {
788                 Map<String, Object> props = new HashMap<>();
789                 props.put(AvailableSettings.JPA_JTA_DATASOURCE, bds);
790
791                 this.bds = bds;
792                 this.emf = makeEntMgrFact(props);
793
794             } catch (RuntimeException e) {
795                 closeDataSource();
796                 throw e;
797             }
798         }
799
800         /** Closes the entity manager factory and the data source. */
801         public void close() {
802             try {
803                 emf.close();
804
805             } catch (RuntimeException e) {
806                 closeDataSource();
807                 throw e;
808             }
809
810             closeDataSource();
811         }
812
813         /** Closes the data source only. */
814         private void closeDataSource() {
815             try {
816                 bds.close();
817
818             } catch (SQLException e) {
819                 throw new PersistenceFeatureException(e);
820             }
821         }
822     }
823
824     private static class SingletonRegistry {
825         private static final TransactionSynchronizationRegistry transreg =
826                 new com.arjuna.ats.internal.jta.transaction.arjunacore
827                 .TransactionSynchronizationRegistryImple();
828
829         private SingletonRegistry() {
830             super();
831         }
832     }
833
834     /* Factory for various items. Methods can be overridden for junit testing. */
835
836     /**
837      * Gets the transaction manager.
838      *
839      * @return the transaction manager
840      */
841     protected TransactionManager getTransMgr() {
842         return com.arjuna.ats.jta.TransactionManager.transactionManager();
843     }
844
845     /**
846      * Gets the user transaction.
847      *
848      * @return the user transaction
849      */
850     protected UserTransaction getUserTrans() {
851         return com.arjuna.ats.jta.UserTransaction.userTransaction();
852     }
853
854     /**
855      * Gets the transaction synchronization registry.
856      *
857      * @return the transaction synchronization registry
858      */
859     protected TransactionSynchronizationRegistry getTransSyncReg() {
860         return SingletonRegistry.transreg;
861     }
862
863     /**
864      * Gets the KIE services.
865      *
866      * @return the KIE services
867      */
868     protected KieServices getKieServices() {
869         return KieServices.Factory.get();
870     }
871
872     /**
873      * Loads properties from a file.
874      *
875      * @param filenm name of the file to load
876      * @return properties, as loaded from the file
877      * @throws IOException if an error occurs reading from the file
878      */
879     protected Properties loadProperties(String filenm) throws IOException {
880         return PropertyUtil.getProperties(filenm);
881     }
882
883     /**
884      * Makes a Data Source.
885      *
886      * @param dsProps data source properties
887      * @return a new data source
888      */
889     protected BasicDataSource makeDataSource(Properties dsProps) {
890         try {
891             return BasicDataSourceFactory.createDataSource(dsProps);
892
893         } catch (Exception e) {
894             throw new PersistenceFeatureException(e);
895         }
896     }
897
898     /**
899      * Makes a new JPA connector for drools sessions.
900      *
901      * @param emf entity manager factory
902      * @return a new JPA connector for drools sessions
903      */
904     protected DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
905         return new JpaDroolsSessionConnector(emf);
906     }
907
908     /**
909      * Makes a new entity manager factory.
910      *
911      * @param props properties with which the factory should be configured
912      * @return a new entity manager factory
913      */
914     protected EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
915         return Persistence.createEntityManagerFactory("onapsessionsPU", props);
916     }
917
918     /**
919      * Gets the policy controller associated with a given policy container.
920      *
921      * @param container container whose controller is to be retrieved
922      * @return the container's controller
923      */
924     protected PolicyController getPolicyController(PolicyContainer container) {
925         return PolicyControllerConstants.getFactory().get(container.getGroupId(), container.getArtifactId());
926     }
927
928     /**
929      * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations,
930      * particularly when they are not, themselves, Runtime exceptions.
931      */
932     public static class PersistenceFeatureException extends RuntimeException {
933         private static final long serialVersionUID = 1L;
934
935         /**
936          * Constructor.
937          * */
938         public PersistenceFeatureException(Exception ex) {
939             super(ex);
940         }
941     }
942 }