129be8a7f325aa2092ecaab7ca79750dde29d2f0
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * If this feature is supported, there is a single instance of it. It adds
58  * persistence to Drools sessions. In addition, if an active-standby feature
59  * exists, then that is used to determine the active and last-active PDP. If it
60  * does not exist, then the current host name is used as the PDP id.
61  *
62  * The bulk of the code here was once in other classes, such as
63  * 'PolicyContainer' and 'Main'. It was moved here as part of making this a
64  * separate optional feature.
65  */
66 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
67
68         private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
69
70         /**
71          * Standard factory used to get various items.
72          */
73         private static Factory stdFactory = new Factory();
74
75         /**
76          * Factory used to get various items.
77          */
78         private Factory fact = stdFactory;
79
80         /**
81          * KieService factory.
82          */
83         private KieServices kieSvcFact;
84
85         /**
86          * Persistence properties.
87          */
88         private Properties persistProps;
89
90         /**
91          * Whether or not the SessionInfo records should be cleaned out.
92          */
93         private boolean sessInfoCleaned;
94
95         /**
96          * SessionInfo timeout, in milli-seconds, as read from
97          * {@link #persistProps}.
98          */
99         private long sessionInfoTimeoutMs;
100
101         /**
102          * Object used to serialize cleanup of sessioninfo table.
103          */
104         private Object cleanupLock = new Object();
105
106         /**
107          * Sets the factory to be used during junit testing.
108          * 
109          * @param fact
110          *            factory to be used
111          */
112         protected void setFactory(Factory fact) {
113                 this.fact = fact;
114         }
115
116         /**
117          * Lookup the adjunct for this feature that is associated with the specified
118          * PolicyContainer. If not found, create one.
119          * 
120          * @param policyContainer
121          *            the container whose adjunct we are looking up, and possibly
122          *            creating
123          * @return the associated 'ContainerAdjunct' instance, which may be new
124          */
125         private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
126
127                 Object rval = policyContainer.getAdjunct(this);
128
129                 if (rval == null || !(rval instanceof ContainerAdjunct)) {
130                         // adjunct does not exist, or has the wrong type (should never
131                         // happen)
132                         rval = new ContainerAdjunct(policyContainer);
133                         policyContainer.setAdjunct(this, rval);
134                 }
135
136                 return (ContainerAdjunct) rval;
137         }
138
139         /**
140          * {@inheritDoc}
141          */
142         @Override
143         public int getSequenceNumber() {
144                 return 1;
145         }
146
147         /**
148          * {@inheritDoc}
149          */
150         @Override
151         public void globalInit(String[] args, String configDir) {
152
153                 kieSvcFact = fact.getKieServices();
154
155                 try {
156                         persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
157
158                 } catch (IOException e1) {
159                         logger.error("initializePersistence: ", e1);
160                 }
161
162                 sessionInfoTimeoutMs = getPersistenceTimeout();
163         }
164
165         /**
166          * Creates a persistent KieSession, loading it from the persistent store, or
167          * creating one, if it does not exist yet.
168          */
169         @Override
170         public KieSession activatePolicySession(PolicyContainer policyContainer, String name, String kieBaseName) {
171
172                 if (isPersistenceEnabled(policyContainer, name)) {
173                         cleanUpSessionInfo();
174
175                         return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
176                 }
177
178                 return null;
179         }
180
181         /**
182          * {@inheritDoc}
183          */
184         @Override
185         public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
186
187                 PolicyContainer policyContainer = session.getPolicyContainer();
188                 if (isPersistenceEnabled(policyContainer, session.getName())) {
189                         return new PersistentThreadModel(session, getProperties(policyContainer));
190                 }
191                 return null;
192         }
193
194         /**
195          * {@inheritDoc}
196          */
197         @Override
198         public void disposeKieSession(PolicySession policySession) {
199
200                 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
201                 if (contAdj != null) {
202                         contAdj.disposeKieSession(policySession.getName());
203                 }
204         }
205
206         /**
207          * {@inheritDoc}
208          */
209         @Override
210         public void destroyKieSession(PolicySession policySession) {
211
212                 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
213                 if (contAdj != null) {
214                         contAdj.destroyKieSession(policySession.getName());
215                 }
216         }
217
218         /**
219          * {@inheritDoc}
220          */
221         @Override
222         public boolean afterStart(PolicyEngine engine) {
223                 return false;
224         }
225
226         /**
227          * {@inheritDoc}
228          */
229         @Override
230         public boolean beforeStart(PolicyEngine engine) {
231                 synchronized (cleanupLock) {
232                         sessInfoCleaned = false;
233                 }
234
235                 return false;
236         }
237
238         /**
239          * {@inheritDoc}
240          */
241         @Override
242         public boolean beforeActivate(PolicyEngine engine) {
243                 synchronized (cleanupLock) {
244                         sessInfoCleaned = false;
245                 }
246
247                 return false;
248         }
249
250         /**
251          * {@inheritDoc}
252          */
253         @Override
254         public boolean afterActivate(PolicyEngine engine) {
255                 return false;
256         }
257
258         /* ============================================================ */
259
260         /**
261          * Gets the persistence timeout value for sessioninfo records.
262          * 
263          * @return the timeout value, in milli-seconds, or {@code -1} if it is
264          *         unspecified or invalid
265          */
266         private long getPersistenceTimeout() {
267                 String timeoutString = null;
268
269                 try {
270                         timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
271
272                         if (timeoutString != null) {
273                                 // timeout parameter is specified
274                                 return Long.valueOf(timeoutString) * 1000;
275                         }
276
277                 } catch (NumberFormatException e) {
278                         logger.error("Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
279                                         timeoutString, e);
280                 }
281
282                 return -1;
283         }
284
285         /* ============================================================ */
286
287         /**
288          * Each instance of this class is a logical extension of a 'PolicyContainer'
289          * instance. Its reference is stored in the 'adjuncts' table within the
290          * 'PolicyContainer', and will be garbage-collected with the container.
291          */
292         protected class ContainerAdjunct {
293                 /**
294                  * 'PolicyContainer' instance that this adjunct is extending.
295                  */
296                 private PolicyContainer policyContainer;
297
298                 /**
299                  * Maps a KIE session name to its data source.
300                  */
301                 private Map<String, DsEmf> name2ds = new HashMap<>();
302
303                 /**
304                  * Constructor - initialize a new 'ContainerAdjunct'
305                  *
306                  * @param policyContainer
307                  *            the 'PolicyContainer' instance this adjunct is extending
308                  */
309                 private ContainerAdjunct(PolicyContainer policyContainer) {
310                         this.policyContainer = policyContainer;
311                 }
312
313                 /**
314                  * Create a new persistent KieSession. If there is already a
315                  * corresponding entry in the database, it is used to initialize the
316                  * KieSession. If not, a completely new session is created.
317                  * 
318                  * @param name
319                  *            the name of the KieSession (which is also the name of the
320                  *            associated PolicySession)
321                  * @param kieBaseName
322                  *            the name of the 'KieBase' instance containing this session
323                  * @return a new KieSession with persistence enabled
324                  */
325                 private KieSession newPersistentKieSession(String name, String kieBaseName) {
326
327                         configureSysProps();
328
329                         BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
330                         DsEmf dsemf = new DsEmf(ds);
331
332                         try {
333                                 EntityManagerFactory emf = dsemf.emf;
334                                 DroolsSessionConnector conn = fact.makeJpaConnector(emf);
335
336                                 long desiredSessionId = getSessionId(conn, name);
337
338                                 logger.info("\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
339
340                                 // session does not exist -- attempt to create one
341                                 logger.info("getPolicySession:session does not exist -- attempt to create one with name {}", name);
342
343                                 Environment env = kieSvcFact.newEnvironment();
344
345                                 configureKieEnv(env, emf);
346
347                                 KieSessionConfiguration kConf = kieSvcFact.newKieSessionConfiguration();
348
349                                 KieSession kieSession = (desiredSessionId >= 0
350                                                 ? loadKieSession(kieBaseName, desiredSessionId, env, kConf) : null);
351
352                                 if (kieSession == null) {
353                                         // loadKieSession() returned null or desiredSessionId < 0
354                                         logger.info("LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
355
356                                         kieSession = newKieSession(kieBaseName, env);
357                                 }
358
359                                 replaceSession(conn, name, kieSession);
360
361                                 name2ds.put(name, dsemf);
362
363                                 return kieSession;
364
365                         } catch (RuntimeException e) {
366                                 dsemf.close();
367                                 throw e;
368                         }
369                 }
370
371                 /**
372                  * Loads an existing KieSession from the persistent store.
373                  * 
374                  * @param kieBaseName
375                  *            the name of the 'KieBase' instance containing this session
376                  * @param desiredSessionId
377                  *            id of the desired KieSession
378                  * @param env
379                  *            Kie Environment for the session
380                  * @param kConf
381                  *            Kie Configuration for the session
382                  * @return the persistent session, or {@code null} if it could not be
383                  *         loaded
384                  */
385                 private KieSession loadKieSession(String kieBaseName, long desiredSessionId, Environment env,
386                                 KieSessionConfiguration kConf) {
387                         try {
388                                 KieSession kieSession = kieSvcFact.getStoreServices().loadKieSession(desiredSessionId,
389                                                 policyContainer.getKieContainer().getKieBase(kieBaseName), kConf, env);
390
391                                 logger.info("LOADING Loaded session {}", desiredSessionId);
392
393                                 return kieSession;
394
395                         } catch (Exception e) {
396                                 logger.error("loadKieSession error: ", e);
397                                 return null;
398                         }
399                 }
400
401                 /**
402                  * Creates a new, persistent KieSession.
403                  * 
404                  * @param kieBaseName
405                  *            the name of the 'KieBase' instance containing this session
406                  * @param env
407                  *            Kie Environment for the session
408                  * @return a new, persistent session
409                  */
410                 private KieSession newKieSession(String kieBaseName, Environment env) {
411                         KieSession kieSession = kieSvcFact.getStoreServices()
412                                         .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
413
414                         logger.info("LOADING CREATED {}", kieSession.getIdentifier());
415
416                         return kieSession;
417                 }
418
419                 /**
420                  * Closes the data source associated with a session.
421                  * 
422                  * @param name
423                  *            name of the session being destroyed
424                  */
425                 private void destroyKieSession(String name) {
426                         closeDataSource(name);
427                 }
428
429                 /**
430                  * Closes the data source associated with a session.
431                  * 
432                  * @param name
433                  *            name of the session being disposed of
434                  */
435                 private void disposeKieSession(String name) {
436                         closeDataSource(name);
437                 }
438
439                 /**
440                  * Closes the data source associated with a session.
441                  * 
442                  * @param name
443                  *            name of the session whose data source is to be closed
444                  */
445                 private void closeDataSource(String name) {
446                         DsEmf ds = name2ds.remove(name);
447                         if (ds != null) {
448                                 ds.close();
449                         }
450                 }
451
452                 /**
453                  * Configures java system properties for JPA/JTA.
454                  */
455                 private void configureSysProps() {
456                         System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
457                         System.setProperty("com.arjuna.ats.arjuna.objectstore.objectStoreDir",
458                                         persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
459                         System.setProperty("ObjectStoreEnvironmentBean.objectStoreDir",
460                                         persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
461                 }
462
463                 /**
464                  * Configures a Kie Environment
465                  * 
466                  * @param env
467                  *            environment to be configured
468                  * @param emf
469                  *            entity manager factory
470                  */
471                 private void configureKieEnv(Environment env, EntityManagerFactory emf) {
472                         env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
473                         env.set(EnvironmentName.TRANSACTION, fact.getUserTrans());
474                         env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg());
475                         env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
476                 }
477
478                 /**
479                  * Gets a session's ID from the persistent store.
480                  * 
481                  * @param conn
482                  *            persistence connector
483                  * @param sessnm
484                  *            name of the session
485                  * @return the session's id, or {@code -1} if the session is not found
486                  */
487                 private long getSessionId(DroolsSessionConnector conn, String sessnm) {
488                         DroolsSession sess = conn.get(sessnm);
489                         return sess != null ? sess.getSessionId() : -1;
490                 }
491
492                 /**
493                  * Replaces a session within the persistent store, if it exists. Adds it
494                  * otherwise.
495                  * 
496                  * @param conn
497                  *            persistence connector
498                  * @param sessnm
499                  *            name of session to be updated
500                  * @param kieSession
501                  *            new session information
502                  */
503                 private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
504
505                         DroolsSessionEntity sess = new DroolsSessionEntity();
506
507                         sess.setSessionName(sessnm);
508                         sess.setSessionId(kieSession.getIdentifier());
509
510                         conn.replace(sess);
511                 }
512         }
513
514         /* ============================================================ */
515
516         /**
517          * Gets the data source properties.
518          * 
519          * @return the data source properties
520          */
521         private Properties getDataSourceProperties() {
522                 Properties props = new Properties();
523                 props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
524                 props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
525                 props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
526                 props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
527                 props.put("maxActive", "3");
528                 props.put("maxIdle", "1");
529                 props.put("maxWait", "120000");
530                 props.put("whenExhaustedAction", "2");
531                 props.put("testOnBorrow", "false");
532                 props.put("poolPreparedStatements", "true");
533
534                 return props;
535         }
536
537         /**
538          * Removes "old" Drools 'sessioninfo' records, so they aren't used to
539          * restore data to Drools sessions. This also has the useful side-effect of
540          * removing abandoned records as well.
541          */
542         private void cleanUpSessionInfo() {
543
544                 synchronized (cleanupLock) {
545
546                         if (sessInfoCleaned) {
547                                 logger.info("Clean up of sessioninfo table: already done");
548                                 return;
549                         }
550
551                         if (sessionInfoTimeoutMs < 0) {
552                                 logger.info("Clean up of sessioninfo table: no timeout specified");
553                                 return;
554                         }
555
556                         // now do the record deletion
557                         try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
558                                         Connection connection = ds.getConnection();
559                                         PreparedStatement statement = connection.prepareStatement(
560                                                         "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
561
562                                 connection.setAutoCommit(true);
563
564                                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
565
566                                 int count = statement.executeUpdate();
567                                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
568
569                         } catch (SQLException e) {
570                                 logger.error("Clean up of sessioninfo table failed", e);
571                         }
572
573                         // TODO: delete DroolsSessionEntity where sessionId not in
574                         // (sessinfo.xxx)
575
576                         sessInfoCleaned = true;
577                 }
578         }
579
580         /**
581          * Determine whether persistence is enabled for a specific container
582          * 
583          * @param container
584          *            container to be checked
585          * @param sessionName
586          *            name of the session to be checked
587          * @return {@code true} if persistence is enabled for this container, and
588          *         {@code false} if not
589          */
590         private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
591                 Properties properties = getProperties(container);
592                 boolean rval = false;
593
594                 if (properties != null) {
595                         // fetch the 'type' property
596                         String type = getProperty(properties, sessionName, "type");
597                         rval = "auto".equals(type) || "native".equals(type);
598                 }
599
600                 return rval;
601         }
602
603         /**
604          * Determine the controller properties associated with the policy container.
605          * 
606          * @param container
607          *            container whose properties are to be retrieved
608          * @return the container's properties, or {@code null} if not found
609          */
610         private Properties getProperties(PolicyContainer container) {
611                 try {
612                         return fact.getPolicyController(container).getProperties();
613                 } catch (IllegalArgumentException e) {
614                         logger.error("getProperties exception: ", e);
615                         return null;
616                 }
617         }
618
619         /**
620          * Fetch the persistence property associated with a session. The name may
621          * have the form:
622          * <ul>
623          * <li>persistence.SESSION-NAME.PROPERTY</li>
624          * <li>persistence.PROPERTY</li>
625          * </ul>
626          * 
627          * @param properties
628          *            properties from which the value is to be retrieved
629          * @param sessionName
630          *            session name of interest
631          * @param property
632          *            property name of interest
633          * @return the property value, or {@code null} if not found
634          */
635         private String getProperty(Properties properties, String sessionName, String property) {
636                 String value = properties.getProperty("persistence." + sessionName + "." + property);
637                 if (value == null) {
638                         value = properties.getProperty("persistence." + property);
639                 }
640
641                 return value;
642         }
643
644         /* ============================================================ */
645
646         /**
647          * This 'ThreadModel' variant periodically calls
648          * 'KieSession.fireAllRules()', because the 'fireUntilHalt' method isn't
649          * compatible with persistence.
650          */
651         public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
652
653                 /**
654                  * Session associated with this persistent thread.
655                  */
656                 private final PolicySession session;
657
658                 /**
659                  * The session thread.
660                  */
661                 private final Thread thread;
662
663                 /**
664                  * Used to indicate that processing should stop.
665                  */
666                 private final CountDownLatch stopped = new CountDownLatch(1);
667
668                 /**
669                  * Minimum time, in milli-seconds, that the thread should sleep before
670                  * firing rules again.
671                  */
672                 long minSleepTime = 100;
673
674                 /**
675                  * Maximum time, in milli-seconds, that the thread should sleep before
676                  * firing rules again. This is a "half" time, so that we can multiply it
677                  * by two without overflowing the word size.
678                  */
679                 long halfMaxSleepTime = 5000L / 2L;
680
681                 /**
682                  * Constructor - initialize variables and create thread
683                  *
684                  * @param session
685                  *            the 'PolicySession' instance
686                  * @param properties
687                  *            may contain additional session properties
688                  */
689                 public PersistentThreadModel(PolicySession session, Properties properties) {
690                         this.session = session;
691                         this.thread = new Thread(this, getThreadName());
692
693                         if (properties == null) {
694                                 return;
695                         }
696
697                         // extract 'minSleepTime' and/or 'maxSleepTime'
698                         String name = session.getName();
699
700                         // fetch 'minSleepTime' value, and update if defined
701                         String sleepTimeString = getProperty(properties, name, "minSleepTime");
702                         if (sleepTimeString != null) {
703                                 try {
704                                         minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
705                                 } catch (Exception e) {
706                                         logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
707                                 }
708                         }
709
710                         // fetch 'maxSleepTime' value, and update if defined
711                         long maxSleepTime = 2 * halfMaxSleepTime;
712                         sleepTimeString = getProperty(properties, name, "maxSleepTime");
713                         if (sleepTimeString != null) {
714                                 try {
715                                         maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
716                                 } catch (Exception e) {
717                                         logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
718                                 }
719                         }
720
721                         // swap values if needed
722                         if (minSleepTime > maxSleepTime) {
723                                 logger.error("minSleepTime(" + minSleepTime + ") is greater than maxSleepTime(" + maxSleepTime
724                                                 + ") -- swapping");
725                                 long tmp = minSleepTime;
726                                 minSleepTime = maxSleepTime;
727                                 maxSleepTime = tmp;
728                         }
729
730                         halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
731                 }
732
733                 /**
734                  * @return the String to use as the thread name
735                  */
736                 private String getThreadName() {
737                         return "Session " + session.getFullName() + " (persistent)";
738                 }
739
740                 /***************************/
741                 /* 'ThreadModel' interface */
742                 /***************************/
743
744                 /**
745                  * {@inheritDoc}
746                  */
747                 @Override
748                 public void start() {
749                         thread.start();
750                 }
751
752                 /**
753                  * {@inheritDoc}
754                  */
755                 @Override
756                 public void stop() {
757                         // tell the thread to stop
758                         stopped.countDown();
759
760                         // wait up to 10 seconds for the thread to stop
761                         try {
762                                 thread.join(10000);
763
764                         } catch (InterruptedException e) {
765                                 logger.error("stopThread exception: ", e);
766                                 Thread.currentThread().interrupt();
767                         }
768
769                         // verify that it's done
770                         if (thread.isAlive()) {
771                                 logger.error("stopThread: still running");
772                         }
773                 }
774
775                 /**
776                  * {@inheritDoc}
777                  */
778                 @Override
779                 public void updated() {
780                         // the container artifact has been updated -- adjust the thread name
781                         thread.setName(getThreadName());
782                 }
783
784                 /************************/
785                 /* 'Runnable' interface */
786                 /************************/
787
788                 /**
789                  * {@inheritDoc}
790                  */
791                 @Override
792                 public void run() {
793                         logger.info("PersistentThreadModel running");
794
795                         // set thread local variable
796                         session.setPolicySession();
797
798                         KieSession kieSession = session.getKieSession();
799                         long sleepTime = 2 * halfMaxSleepTime;
800
801                         // We want to continue, despite any exceptions that occur
802                         // while rules are fired.
803
804                         boolean cont = true;
805                         while(cont) {
806
807                                 try {
808                                         if (kieSession.fireAllRules() > 0) {
809                                                 // some rules fired -- reduce poll delay
810                                                 sleepTime = Math.max(minSleepTime, sleepTime / 2);
811                                         } else {
812                                                 // no rules fired -- increase poll delay
813                                                 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
814                                         }
815
816                                 } catch (Exception | LinkageError e) {
817                                         logger.error("Exception during kieSession.fireAllRules", e);
818                                 }
819                                 
820
821                                 try {
822                                         if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
823                                                 cont = false;
824                                         }
825
826                                 } catch (InterruptedException e) {
827                                         logger.error("startThread exception: ", e);
828                                         Thread.currentThread().interrupt();
829                                         cont = false;
830                                 }
831                         }
832
833                         logger.info("PersistentThreadModel completed");
834                 }
835         }
836
837         /* ============================================================ */
838
839         /**
840          * DataSource-EntityManagerFactory pair.
841          */
842         private class DsEmf {
843                 private BasicDataSource bds;
844                 private EntityManagerFactory emf;
845
846                 /**
847                  * Makes an entity manager factory for the given data source.
848                  * 
849                  * @param bds
850                  *            pooled data source
851                  */
852                 public DsEmf(BasicDataSource bds) {
853                         try {
854                                 Map<String, Object> props = new HashMap<>();
855                                 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
856
857                                 this.bds = bds;
858                                 this.emf = fact.makeEntMgrFact(props);
859
860                         } catch (RuntimeException e) {
861                                 closeDataSource();
862                                 throw e;
863                         }
864                 }
865
866                 /**
867                  * Closes the entity manager factory and the data source.
868                  */
869                 public void close() {
870                         try {
871                                 emf.close();
872
873                         } catch (RuntimeException e) {
874                                 closeDataSource();
875                                 throw e;
876                         }
877
878                         closeDataSource();
879                 }
880
881                 /**
882                  * Closes the data source only.
883                  */
884                 private void closeDataSource() {
885                         try {
886                                 bds.close();
887
888                         } catch (SQLException e) {
889                                 throw new PersistenceFeatureException(e);
890                         }
891
892                 }
893         }
894
895         private static class SingletonRegistry {
896                 private static final TransactionSynchronizationRegistry transreg = new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple();
897                 
898                 private SingletonRegistry() {
899                         super();
900                 }
901         }
902
903         /**
904          * Factory for various items. Methods can be overridden for junit testing.
905          */
906         protected static class Factory {
907
908                 /**
909                  * Gets the transaction manager.
910                  * 
911                  * @return the transaction manager
912                  */
913                 public TransactionManager getTransMgr() {
914                         return com.arjuna.ats.jta.TransactionManager.transactionManager();
915                 }
916
917                 /**
918                  * Gets the user transaction.
919                  * 
920                  * @return the user transaction
921                  */
922                 public UserTransaction getUserTrans() {
923                         return com.arjuna.ats.jta.UserTransaction.userTransaction();
924                 }
925
926                 /**
927                  * Gets the transaction synchronization registry.
928                  * 
929                  * @return the transaction synchronization registry
930                  */
931                 public TransactionSynchronizationRegistry getTransSyncReg() {
932                         return SingletonRegistry.transreg;
933                 }
934
935                 /**
936                  * Gets the KIE services.
937                  * 
938                  * @return the KIE services
939                  */
940                 public KieServices getKieServices() {
941                         return KieServices.Factory.get();
942                 }
943
944                 /**
945                  * Loads properties from a file.
946                  * 
947                  * @param filenm
948                  *            name of the file to load
949                  * @return properties, as loaded from the file
950                  * @throws IOException
951                  *             if an error occurs reading from the file
952                  */
953                 public Properties loadProperties(String filenm) throws IOException {
954                         return PropertyUtil.getProperties(filenm);
955                 }
956
957                 /**
958                  * Makes a Data Source.
959                  * 
960                  * @param dsProps
961                  *            data source properties
962                  * @return a new data source
963                  */
964                 public BasicDataSource makeDataSource(Properties dsProps) {
965                         try {
966                                 return BasicDataSourceFactory.createDataSource(dsProps);
967
968                         } catch (Exception e) {
969                                 throw new PersistenceFeatureException(e);
970                         }
971                 }
972
973                 /**
974                  * Makes a new JPA connector for drools sessions.
975                  * 
976                  * @param emf
977                  *            entity manager factory
978                  * @return a new JPA connector for drools sessions
979                  */
980                 public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
981                         return new JpaDroolsSessionConnector(emf);
982                 }
983
984                 /**
985                  * Makes a new entity manager factory.
986                  * 
987                  * @param props
988                  *            properties with which the factory should be configured
989                  * @return a new entity manager factory
990                  */
991                 public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
992                         return Persistence.createEntityManagerFactory("onapsessionsPU", props);
993                 }
994
995                 /**
996                  * Gets the policy controller associated with a given policy container.
997                  * 
998                  * @param container
999                  *            container whose controller is to be retrieved
1000                  * @return the container's controller
1001                  */
1002                 public PolicyController getPolicyController(PolicyContainer container) {
1003                         return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
1004                 }
1005         }
1006
1007         /**
1008          * Runtime exceptions generated by this class. Wraps exceptions generated by
1009          * delegated operations, particularly when they are not, themselves, Runtime
1010          * exceptions.
1011          */
1012         public static class PersistenceFeatureException extends RuntimeException {
1013                 private static final long serialVersionUID = 1L;
1014
1015                 /**
1016                  * 
1017                  * @param e
1018                  *            exception to be wrapped
1019                  */
1020                 public PersistenceFeatureException(Exception e) {
1021                         super(e);
1022                 }
1023         }
1024 }