032383b4d26735b2f3d674ae48798cf2a991c5b6
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * feature-session-persistence
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.persistence;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.SQLException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.persistence.EntityManagerFactory;
34 import javax.persistence.Persistence;
35 import javax.transaction.TransactionManager;
36 import javax.transaction.TransactionSynchronizationRegistry;
37 import javax.transaction.UserTransaction;
38
39 import org.apache.commons.dbcp2.BasicDataSource;
40 import org.apache.commons.dbcp2.BasicDataSourceFactory;
41 import org.kie.api.KieServices;
42 import org.kie.api.runtime.Environment;
43 import org.kie.api.runtime.EnvironmentName;
44 import org.kie.api.runtime.KieSession;
45 import org.kie.api.runtime.KieSessionConfiguration;
46 import org.onap.policy.drools.core.PolicyContainer;
47 import org.onap.policy.drools.core.PolicySession;
48 import org.onap.policy.drools.core.PolicySessionFeatureAPI;
49 import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.onap.policy.drools.system.PolicyEngine;
52 import org.onap.policy.drools.utils.PropertyUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * If this feature is supported, there is a single instance of it. It adds
58  * persistence to Drools sessions. In addition, if an active-standby feature
59  * exists, then that is used to determine the active and last-active PDP. If it
60  * does not exist, then the current host name is used as the PDP id.
61  *
62  * The bulk of the code here was once in other classes, such as
63  * 'PolicyContainer' and 'Main'. It was moved here as part of making this a
64  * separate optional feature.
65  */
66 public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI {
67
68         private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class);
69
70         /**
71          * Standard factory used to get various items.
72          */
73         private static Factory stdFactory = new Factory();
74
75         /**
76          * Factory used to get various items.
77          */
78         private Factory fact = stdFactory;
79
80         /**
81          * KieService factory.
82          */
83         private KieServices kieSvcFact;
84
85         /**
86          * Persistence properties.
87          */
88         private Properties persistProps;
89
90         /**
91          * Whether or not the SessionInfo records should be cleaned out.
92          */
93         private boolean sessInfoCleaned;
94
95         /**
96          * SessionInfo timeout, in milli-seconds, as read from
97          * {@link #persistProps}.
98          */
99         private long sessionInfoTimeoutMs;
100
101         /**
102          * Object used to serialize cleanup of sessioninfo table.
103          */
104         private Object cleanupLock = new Object();
105
106         /**
107          * Sets the factory to be used during junit testing.
108          * 
109          * @param fact
110          *            factory to be used
111          */
112         protected void setFactory(Factory fact) {
113                 this.fact = fact;
114         }
115
116         /**
117          * Lookup the adjunct for this feature that is associated with the specified
118          * PolicyContainer. If not found, create one.
119          * 
120          * @param policyContainer
121          *            the container whose adjunct we are looking up, and possibly
122          *            creating
123          * @return the associated 'ContainerAdjunct' instance, which may be new
124          */
125         private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) {
126
127                 Object rval = policyContainer.getAdjunct(this);
128
129                 if (rval == null || !(rval instanceof ContainerAdjunct)) {
130                         // adjunct does not exist, or has the wrong type (should never
131                         // happen)
132                         rval = new ContainerAdjunct(policyContainer);
133                         policyContainer.setAdjunct(this, rval);
134                 }
135
136                 return (ContainerAdjunct) rval;
137         }
138
139         /**
140          * {@inheritDoc}
141          */
142         @Override
143         public int getSequenceNumber() {
144                 return 1;
145         }
146
147         /**
148          * {@inheritDoc}
149          */
150         @Override
151         public void globalInit(String args[], String configDir) {
152
153                 kieSvcFact = fact.getKieServices();
154
155                 try {
156                         persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties");
157
158                 } catch (IOException e1) {
159                         logger.error("initializePersistence: ", e1);
160                 }
161
162                 sessionInfoTimeoutMs = getPersistenceTimeout();
163         }
164
165         /**
166          * Creates a persistent KieSession, loading it from the persistent store, or
167          * creating one, if it does not exist yet.
168          */
169         @Override
170         public KieSession activatePolicySession(PolicyContainer policyContainer, String name, String kieBaseName) {
171
172                 if (isPersistenceEnabled(policyContainer, name)) {
173                         cleanUpSessionInfo();
174
175                         return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName);
176                 }
177
178                 return null;
179         }
180
181         /**
182          * {@inheritDoc}
183          */
184         @Override
185         public PolicySession.ThreadModel selectThreadModel(PolicySession session) {
186
187                 PolicyContainer policyContainer = session.getPolicyContainer();
188                 if (isPersistenceEnabled(policyContainer, session.getName())) {
189                         return new PersistentThreadModel(session, getProperties(policyContainer));
190                 }
191                 return null;
192         }
193
194         /**
195          * {@inheritDoc}
196          */
197         @Override
198         public void disposeKieSession(PolicySession policySession) {
199
200                 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
201                 if (contAdj != null) {
202                         contAdj.disposeKieSession(policySession.getName());
203                 }
204         }
205
206         /**
207          * {@inheritDoc}
208          */
209         @Override
210         public void destroyKieSession(PolicySession policySession) {
211
212                 ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this);
213                 if (contAdj != null) {
214                         contAdj.destroyKieSession(policySession.getName());
215                 }
216         }
217
218         /**
219          * {@inheritDoc}
220          */
221         @Override
222         public boolean afterStart(PolicyEngine engine) {
223                 return false;
224         }
225
226         /**
227          * {@inheritDoc}
228          */
229         @Override
230         public boolean beforeStart(PolicyEngine engine) {
231                 synchronized (cleanupLock) {
232                         sessInfoCleaned = false;
233                 }
234
235                 return false;
236         }
237
238         /**
239          * {@inheritDoc}
240          */
241         @Override
242         public boolean beforeActivate(PolicyEngine engine) {
243                 synchronized (cleanupLock) {
244                         sessInfoCleaned = false;
245                 }
246
247                 return false;
248         }
249
250         /**
251          * {@inheritDoc}
252          */
253         @Override
254         public boolean afterActivate(PolicyEngine engine) {
255                 return false;
256         }
257
258         /* ============================================================ */
259
260         /**
261          * Gets the persistence timeout value for sessioninfo records.
262          * 
263          * @return the timeout value, in milli-seconds, or {@code -1} if it is
264          *         unspecified or invalid
265          */
266         private long getPersistenceTimeout() {
267                 String timeoutString = null;
268
269                 try {
270                         timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT);
271
272                         if (timeoutString != null) {
273                                 // timeout parameter is specified
274                                 return Long.valueOf(timeoutString) * 1000;
275                         }
276
277                 } catch (NumberFormatException e) {
278                         logger.error("Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}",
279                                         timeoutString, e);
280                 }
281
282                 return -1;
283         }
284
285         /* ============================================================ */
286
287         /**
288          * Each instance of this class is a logical extension of a 'PolicyContainer'
289          * instance. Its reference is stored in the 'adjuncts' table within the
290          * 'PolicyContainer', and will be garbage-collected with the container.
291          */
292         protected class ContainerAdjunct {
293                 /**
294                  * 'PolicyContainer' instance that this adjunct is extending.
295                  */
296                 private PolicyContainer policyContainer;
297
298                 /**
299                  * Maps a KIE session name to its data source.
300                  */
301                 private Map<String, DsEmf> name2ds = new HashMap<>();
302
303                 /**
304                  * Constructor - initialize a new 'ContainerAdjunct'
305                  *
306                  * @param policyContainer
307                  *            the 'PolicyContainer' instance this adjunct is extending
308                  */
309                 private ContainerAdjunct(PolicyContainer policyContainer) {
310                         this.policyContainer = policyContainer;
311                 }
312
313                 /**
314                  * Create a new persistent KieSession. If there is already a
315                  * corresponding entry in the database, it is used to initialize the
316                  * KieSession. If not, a completely new session is created.
317                  * 
318                  * @param name
319                  *            the name of the KieSession (which is also the name of the
320                  *            associated PolicySession)
321                  * @param kieBaseName
322                  *            the name of the 'KieBase' instance containing this session
323                  * @return a new KieSession with persistence enabled
324                  */
325                 private KieSession newPersistentKieSession(String name, String kieBaseName) {
326
327                         configureSysProps();
328
329                         BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
330                         DsEmf dsemf = new DsEmf(ds);
331
332                         try {
333                                 EntityManagerFactory emf = dsemf.emf;
334                                 DroolsSessionConnector conn = fact.makeJpaConnector(emf);
335
336                                 long desiredSessionId = getSessionId(conn, name);
337
338                                 logger.info("\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId);
339
340                                 // session does not exist -- attempt to create one
341                                 logger.info("getPolicySession:session does not exist -- attempt to create one with name {}", name);
342
343                                 Environment env = kieSvcFact.newEnvironment();
344
345                                 configureKieEnv(env, emf);
346
347                                 KieSessionConfiguration kConf = kieSvcFact.newKieSessionConfiguration();
348
349                                 KieSession kieSession = (desiredSessionId >= 0
350                                                 ? loadKieSession(kieBaseName, desiredSessionId, env, kConf) : null);
351
352                                 if (kieSession == null) {
353                                         // loadKieSession() returned null or desiredSessionId < 0
354                                         logger.info("LOADING We cannot load session {}. Going to create a new one", desiredSessionId);
355
356                                         kieSession = newKieSession(kieBaseName, env);
357                                 }
358
359                                 replaceSession(conn, name, kieSession);
360
361                                 name2ds.put(name, dsemf);
362
363                                 return kieSession;
364
365                         } catch (RuntimeException e) {
366                                 dsemf.close();
367                                 throw e;
368                         }
369                 }
370
371                 /**
372                  * Loads an existing KieSession from the persistent store.
373                  * 
374                  * @param kieBaseName
375                  *            the name of the 'KieBase' instance containing this session
376                  * @param desiredSessionId
377                  *            id of the desired KieSession
378                  * @param env
379                  *            Kie Environment for the session
380                  * @param kConf
381                  *            Kie Configuration for the session
382                  * @return the persistent session, or {@code null} if it could not be
383                  *         loaded
384                  */
385                 private KieSession loadKieSession(String kieBaseName, long desiredSessionId, Environment env,
386                                 KieSessionConfiguration kConf) {
387                         try {
388                                 KieSession kieSession = kieSvcFact.getStoreServices().loadKieSession(desiredSessionId,
389                                                 policyContainer.getKieContainer().getKieBase(kieBaseName), kConf, env);
390
391                                 logger.info("LOADING Loaded session {}", desiredSessionId);
392
393                                 return kieSession;
394
395                         } catch (Exception e) {
396                                 logger.error("loadKieSession error: ", e);
397                                 return null;
398                         }
399                 }
400
401                 /**
402                  * Creates a new, persistent KieSession.
403                  * 
404                  * @param kieBaseName
405                  *            the name of the 'KieBase' instance containing this session
406                  * @param env
407                  *            Kie Environment for the session
408                  * @return a new, persistent session
409                  */
410                 private KieSession newKieSession(String kieBaseName, Environment env) {
411                         KieSession kieSession = kieSvcFact.getStoreServices()
412                                         .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env);
413
414                         logger.info("LOADING CREATED {}", kieSession.getIdentifier());
415
416                         return kieSession;
417                 }
418
419                 /**
420                  * Closes the data source associated with a session.
421                  * 
422                  * @param name
423                  *            name of the session being destroyed
424                  */
425                 private void destroyKieSession(String name) {
426                         closeDataSource(name);
427                 }
428
429                 /**
430                  * Closes the data source associated with a session.
431                  * 
432                  * @param name
433                  *            name of the session being disposed of
434                  */
435                 private void disposeKieSession(String name) {
436                         closeDataSource(name);
437                 }
438
439                 /**
440                  * Closes the data source associated with a session.
441                  * 
442                  * @param name
443                  *            name of the session whose data source is to be closed
444                  */
445                 private void closeDataSource(String name) {
446                         DsEmf ds = name2ds.remove(name);
447                         if (ds != null) {
448                                 ds.close();
449                         }
450                 }
451         }
452
453         /* ============================================================ */
454
455         /**
456          * Configures java system properties for JPA/JTA.
457          */
458         private void configureSysProps() {
459                 System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60");
460                 System.setProperty("com.arjuna.ats.arjuna.objectstore.objectStoreDir",
461                                 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
462                 System.setProperty("ObjectStoreEnvironmentBean.objectStoreDir",
463                                 persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR));
464         }
465
466         /**
467          * Gets the data source properties.
468          * 
469          * @return the data source properties
470          */
471         private Properties getDataSourceProperties() {
472                 Properties props = new Properties();
473                 props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER));
474                 props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL));
475                 props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER));
476                 props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD));
477                 props.put("maxActive", "3");
478                 props.put("maxIdle", "1");
479                 props.put("maxWait", "120000");
480                 props.put("whenExhaustedAction", "2");
481                 props.put("testOnBorrow", "false");
482                 props.put("poolPreparedStatements", "true");
483
484                 return props;
485         }
486
487         /**
488          * Configures a Kie Environment
489          * 
490          * @param env
491          *            environment to be configured
492          * @param emf
493          *            entity manager factory
494          */
495         private void configureKieEnv(Environment env, EntityManagerFactory emf) {
496                 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
497                 env.set(EnvironmentName.TRANSACTION, fact.getUserTrans());
498                 env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg());
499                 env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr());
500         }
501
502         /**
503          * Removes "old" Drools 'sessioninfo' records, so they aren't used to
504          * restore data to Drools sessions. This also has the useful side-effect of
505          * removing abandoned records as well.
506          */
507         private void cleanUpSessionInfo() {
508
509                 synchronized (cleanupLock) {
510
511                         if (sessInfoCleaned) {
512                                 logger.info("Clean up of sessioninfo table: already done");
513                                 return;
514                         }
515
516                         if (sessionInfoTimeoutMs < 0) {
517                                 logger.info("Clean up of sessioninfo table: no timeout specified");
518                                 return;
519                         }
520
521                         // now do the record deletion
522                         try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties());
523                                         Connection connection = ds.getConnection();
524                                         PreparedStatement statement = connection.prepareStatement(
525                                                         "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) {
526
527                                 connection.setAutoCommit(true);
528
529                                 statement.setLong(1, sessionInfoTimeoutMs / 1000);
530
531                                 int count = statement.executeUpdate();
532                                 logger.info("Cleaning up sessioninfo table -- {} records removed", count);
533
534                         } catch (SQLException e) {
535                                 logger.error("Clean up of sessioninfo table failed", e);
536                         }
537
538                         // TODO: delete DroolsSessionEntity where sessionId not in
539                         // (sessinfo.xxx)
540
541                         sessInfoCleaned = true;
542                 }
543         }
544
545         /**
546          * Gets a session's ID from the persistent store.
547          * 
548          * @param conn
549          *            persistence connector
550          * @param sessnm
551          *            name of the session
552          * @return the session's id, or {@code -1} if the session is not found
553          */
554         private long getSessionId(DroolsSessionConnector conn, String sessnm) {
555                 DroolsSession sess = conn.get(sessnm);
556                 return sess != null ? sess.getSessionId() : -1;
557         }
558
559         /**
560          * Replaces a session within the persistent store, if it exists. Adds it
561          * otherwise.
562          * 
563          * @param conn
564          *            persistence connector
565          * @param sessnm
566          *            name of session to be updated
567          * @param kieSession
568          *            new session information
569          */
570         private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) {
571
572                 DroolsSessionEntity sess = new DroolsSessionEntity();
573
574                 sess.setSessionName(sessnm);
575                 sess.setSessionId(kieSession.getIdentifier());
576
577                 conn.replace(sess);
578         }
579
580         /**
581          * Determine whether persistence is enabled for a specific container
582          * 
583          * @param container
584          *            container to be checked
585          * @param sessionName
586          *            name of the session to be checked
587          * @return {@code true} if persistence is enabled for this container, and
588          *         {@code false} if not
589          */
590         private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) {
591                 Properties properties = getProperties(container);
592                 boolean rval = false;
593
594                 if (properties != null) {
595                         // fetch the 'type' property
596                         String type = getProperty(properties, sessionName, "type");
597                         rval = "auto".equals(type) || "native".equals(type);
598                 }
599
600                 return rval;
601         }
602
603         /**
604          * Determine the controller properties associated with the policy container.
605          * 
606          * @param container
607          *            container whose properties are to be retrieved
608          * @return the container's properties, or {@code null} if not found
609          */
610         private Properties getProperties(PolicyContainer container) {
611                 try {
612                         return fact.getPolicyController(container).getProperties();
613                 } catch (IllegalArgumentException e) {
614                         logger.error("getProperties exception: ", e);
615                         return null;
616                 }
617         }
618
619         /**
620          * Fetch the persistence property associated with a session. The name may
621          * have the form:
622          * <ul>
623          * <li>persistence.SESSION-NAME.PROPERTY</li>
624          * <li>persistence.PROPERTY</li>
625          * </ul>
626          * 
627          * @param properties
628          *            properties from which the value is to be retrieved
629          * @param sessionName
630          *            session name of interest
631          * @param property
632          *            property name of interest
633          * @return the property value, or {@code null} if not found
634          */
635         private String getProperty(Properties properties, String sessionName, String property) {
636                 String value = properties.getProperty("persistence." + sessionName + "." + property);
637                 if (value == null) {
638                         value = properties.getProperty("persistence." + property);
639                 }
640
641                 return value;
642         }
643
644         /* ============================================================ */
645
646         /**
647          * This 'ThreadModel' variant periodically calls
648          * 'KieSession.fireAllRules()', because the 'fireUntilHalt' method isn't
649          * compatible with persistence.
650          */
651         public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel {
652
653                 /**
654                  * Session associated with this persistent thread.
655                  */
656                 private final PolicySession session;
657
658                 /**
659                  * The session thread.
660                  */
661                 private final Thread thread;
662
663                 /**
664                  * Used to indicate that processing should stop.
665                  */
666                 private final CountDownLatch stopped = new CountDownLatch(1);
667
668                 /**
669                  * Minimum time, in milli-seconds, that the thread should sleep before
670                  * firing rules again.
671                  */
672                 long minSleepTime = 100;
673
674                 /**
675                  * Maximum time, in milli-seconds, that the thread should sleep before
676                  * firing rules again. This is a "half" time, so that we can multiply it
677                  * by two without overflowing the word size.
678                  */
679                 long halfMaxSleepTime = 5000L / 2L;
680
681                 /**
682                  * Constructor - initialize variables and create thread
683                  *
684                  * @param session
685                  *            the 'PolicySession' instance
686                  * @param properties
687                  *            may contain additional session properties
688                  */
689                 public PersistentThreadModel(PolicySession session, Properties properties) {
690                         this.session = session;
691                         this.thread = new Thread(this, getThreadName());
692
693                         if (properties == null) {
694                                 return;
695                         }
696
697                         // extract 'minSleepTime' and/or 'maxSleepTime'
698                         String name = session.getName();
699
700                         // fetch 'minSleepTime' value, and update if defined
701                         String sleepTimeString = getProperty(properties, name, "minSleepTime");
702                         if (sleepTimeString != null) {
703                                 try {
704                                         minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
705                                 } catch (Exception e) {
706                                         logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e);
707                                 }
708                         }
709
710                         // fetch 'maxSleepTime' value, and update if defined
711                         long maxSleepTime = 2 * halfMaxSleepTime;
712                         sleepTimeString = getProperty(properties, name, "maxSleepTime");
713                         if (sleepTimeString != null) {
714                                 try {
715                                         maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString));
716                                 } catch (Exception e) {
717                                         logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e);
718                                 }
719                         }
720
721                         // swap values if needed
722                         if (minSleepTime > maxSleepTime) {
723                                 logger.error("minSleepTime(" + minSleepTime + ") is greater than maxSleepTime(" + maxSleepTime
724                                                 + ") -- swapping");
725                                 long tmp = minSleepTime;
726                                 minSleepTime = maxSleepTime;
727                                 maxSleepTime = tmp;
728                         }
729
730                         halfMaxSleepTime = Math.max(1, maxSleepTime / 2);
731                 }
732
733                 /**
734                  * @return the String to use as the thread name
735                  */
736                 private String getThreadName() {
737                         return "Session " + session.getFullName() + " (persistent)";
738                 }
739
740                 /***************************/
741                 /* 'ThreadModel' interface */
742                 /***************************/
743
744                 /**
745                  * {@inheritDoc}
746                  */
747                 @Override
748                 public void start() {
749                         thread.start();
750                 }
751
752                 /**
753                  * {@inheritDoc}
754                  */
755                 @Override
756                 public void stop() {
757                         // tell the thread to stop
758                         stopped.countDown();
759
760                         // wait up to 10 seconds for the thread to stop
761                         try {
762                                 thread.join(10000);
763
764                         } catch (InterruptedException e) {
765                                 logger.error("stopThread exception: ", e);
766                                 Thread.currentThread().interrupt();
767                         }
768
769                         // verify that it's done
770                         if (thread.isAlive()) {
771                                 logger.error("stopThread: still running");
772                         }
773                 }
774
775                 /**
776                  * {@inheritDoc}
777                  */
778                 @Override
779                 public void updated() {
780                         // the container artifact has been updated -- adjust the thread name
781                         thread.setName(getThreadName());
782                 }
783
784                 /************************/
785                 /* 'Runnable' interface */
786                 /************************/
787
788                 /**
789                  * {@inheritDoc}
790                  */
791                 @Override
792                 public void run() {
793                         logger.info("PersistentThreadModel running");
794
795                         // set thread local variable
796                         session.setPolicySession();
797
798                         KieSession kieSession = session.getKieSession();
799                         long sleepTime = 2 * halfMaxSleepTime;
800
801                         // We want to continue, despite any exceptions that occur
802                         // while rules are fired.
803
804                         for (;;) {
805
806                                 try {
807                                         if (kieSession.fireAllRules() > 0) {
808                                                 // some rules fired -- reduce poll delay
809                                                 sleepTime = Math.max(minSleepTime, sleepTime / 2);
810                                         } else {
811                                                 // no rules fired -- increase poll delay
812                                                 sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
813                                         }
814
815                                 } catch (Exception | LinkageError e) {
816                                         logger.error("Exception during kieSession.fireAllRules", e);
817                                 }
818
819                                 try {
820                                         if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
821                                                 break;
822                                         }
823
824                                 } catch (InterruptedException e) {
825                                         logger.error("startThread exception: ", e);
826                                         Thread.currentThread().interrupt();
827                                         break;
828                                 }
829                         }
830
831                         logger.info("PersistentThreadModel completed");
832                 }
833         }
834
835         /* ============================================================ */
836
837         /**
838          * DataSource-EntityManagerFactory pair.
839          */
840         private class DsEmf {
841                 private BasicDataSource bds;
842                 private EntityManagerFactory emf;
843
844                 /**
845                  * Makes an entity manager factory for the given data source.
846                  * 
847                  * @param bds
848                  *            pooled data source
849                  */
850                 public DsEmf(BasicDataSource bds) {
851                         try {
852                                 Map<String, Object> props = new HashMap<>();
853                                 props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds);
854
855                                 this.bds = bds;
856                                 this.emf = fact.makeEntMgrFact(props);
857
858                         } catch (RuntimeException e) {
859                                 closeDataSource();
860                                 throw e;
861                         }
862                 }
863
864                 /**
865                  * Closes the entity manager factory and the data source.
866                  */
867                 public void close() {
868                         try {
869                                 emf.close();
870
871                         } catch (RuntimeException e) {
872                                 closeDataSource();
873                                 throw e;
874                         }
875
876                         closeDataSource();
877                 }
878
879                 /**
880                  * Closes the data source only.
881                  */
882                 private void closeDataSource() {
883                         try {
884                                 bds.close();
885
886                         } catch (SQLException e) {
887                                 throw new PersistenceFeatureException(e);
888                         }
889
890                 }
891         }
892
893         private static class SingletonRegistry {
894                 private static final TransactionSynchronizationRegistry transreg = new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple();
895         }
896
897         /**
898          * Factory for various items. Methods can be overridden for junit testing.
899          */
900         protected static class Factory {
901
902                 /**
903                  * Gets the transaction manager.
904                  * 
905                  * @return the transaction manager
906                  */
907                 public TransactionManager getTransMgr() {
908                         return com.arjuna.ats.jta.TransactionManager.transactionManager();
909                 }
910
911                 /**
912                  * Gets the user transaction.
913                  * 
914                  * @return the user transaction
915                  */
916                 public UserTransaction getUserTrans() {
917                         return com.arjuna.ats.jta.UserTransaction.userTransaction();
918                 }
919
920                 /**
921                  * Gets the transaction synchronization registry.
922                  * 
923                  * @return the transaction synchronization registry
924                  */
925                 public TransactionSynchronizationRegistry getTransSyncReg() {
926                         return SingletonRegistry.transreg;
927                 }
928
929                 /**
930                  * Gets the KIE services.
931                  * 
932                  * @return the KIE services
933                  */
934                 public KieServices getKieServices() {
935                         return KieServices.Factory.get();
936                 }
937
938                 /**
939                  * Loads properties from a file.
940                  * 
941                  * @param filenm
942                  *            name of the file to load
943                  * @return properties, as loaded from the file
944                  * @throws IOException
945                  *             if an error occurs reading from the file
946                  */
947                 public Properties loadProperties(String filenm) throws IOException {
948                         return PropertyUtil.getProperties(filenm);
949                 }
950
951                 /**
952                  * Makes a Data Source.
953                  * 
954                  * @param dsProps
955                  *            data source properties
956                  * @return a new data source
957                  */
958                 public BasicDataSource makeDataSource(Properties dsProps) {
959                         try {
960                                 return BasicDataSourceFactory.createDataSource(dsProps);
961
962                         } catch (Exception e) {
963                                 throw new PersistenceFeatureException(e);
964                         }
965                 }
966
967                 /**
968                  * Makes a new JPA connector for drools sessions.
969                  * 
970                  * @param emf
971                  *            entity manager factory
972                  * @return a new JPA connector for drools sessions
973                  */
974                 public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) {
975                         return new JpaDroolsSessionConnector(emf);
976                 }
977
978                 /**
979                  * Makes a new entity manager factory.
980                  * 
981                  * @param props
982                  *            properties with which the factory should be configured
983                  * @return a new entity manager factory
984                  */
985                 public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) {
986                         return Persistence.createEntityManagerFactory("onapsessionsPU", props);
987                 }
988
989                 /**
990                  * Gets the policy controller associated with a given policy container.
991                  * 
992                  * @param container
993                  *            container whose controller is to be retrieved
994                  * @return the container's controller
995                  */
996                 public PolicyController getPolicyController(PolicyContainer container) {
997                         return PolicyController.factory.get(container.getGroupId(), container.getArtifactId());
998                 }
999         }
1000
1001         /**
1002          * Runtime exceptions generated by this class. Wraps exceptions generated by
1003          * delegated operations, particularly when they are not, themselves, Runtime
1004          * exceptions.
1005          */
1006         public static class PersistenceFeatureException extends RuntimeException {
1007                 private static final long serialVersionUID = 1L;
1008
1009                 /**
1010                  * 
1011                  * @param e
1012                  *            exception to be wrapped
1013                  */
1014                 public PersistenceFeatureException(Exception e) {
1015                         super(e);
1016                 }
1017         }
1018 }