[POLICY-21] PDP-D gets stuck during shutdown 27/5027/4
authorRalph Straubs <rs8887@att.com>
Wed, 14 Jun 2017 08:56:05 +0000 (03:56 -0500)
committerRalph Straubs <rs8887@att.com>
Wed, 14 Jun 2017 13:02:46 +0000 (08:02 -0500)
The fix for this problem is included in the following enhancements:

1) Define a new nested interface 'PolicySession.ThreadModel', which makes
   it possible for features to control the thread or threads processing
   a 'KieSession'.

   The nested class 'PolicySession.DefaultThreadModel' implements the
   default version, which uses 'KieSession.fireUntilHalt()' instead of
   polling 'KieSession.fireAllRules()'.

   A new method 'selectThreadModel(PolicySession session)' has been added
   to 'PolicySessionFeatureAPI' to enable this selection.

2) Update thread names when 'KieContainer.updateToVersion(...)' is called

Change-Id: Ic48089fe5660501e2e3d42b87501697211a9d0fe
Signed-off-by: Ralph Straubs <rs8887@att.com>
policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java
policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java
policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java

index fca30e0..26a8055 100644 (file)
@@ -454,7 +454,15 @@ public class PolicyContainer implements Startable
     }else {
        logger.info("updateToVersion:releaseId " + releaseId.toString());
     }
-       return(kieContainer.updateToVersion(releaseId));
+
+       // notify all 'PolicySession' instances
+       Results results = kieContainer.updateToVersion(releaseId);
+       for (PolicySession session : sessions.values())
+         {
+               session.updated();
+         }
+
+       return(results);
   }
 
   /**
index ae9dbc4..4d717f5 100644 (file)
@@ -67,8 +67,8 @@ public class PolicySession
   // associated 'KieSession' instance
   private KieSession kieSession;
 
-  // if not 'null', this is the thread running 'kieSession.fireUntilHalt()'
-  private Thread thread = null;
+  // if not 'null', this is the thread model processing the 'KieSession'
+  private ThreadModel threadModel = null;
 
   // supports 'getCurrentSession()' method
   static private ThreadLocal<PolicySession> policySession =
@@ -127,99 +127,76 @@ public class PolicySession
   }
 
   /**
-   * this starts a separate thread, which invokes 'KieSession.fireUntilHalt()'.
-   * It does nothing if the thread already exists.
+   * If no 'ThreadModel' is currently running, this method will create one,
+   * and invoke it's 'start()' method. Features implementing
+   * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
+   * the ThreadModel instance.
    */
   public synchronized void startThread()
   {
-       if (thread == null)
+       if (threadModel == null)
          {
-               logger.info("startThread with name " + getFullName());
-               thread = new Thread("Session " + getFullName())
+               // loop through all of the features, and give each one
+               // a chance to create the 'ThreadModel'
+               for (PolicySessionFeatureAPI feature :
+                          PolicySessionFeatureAPI.impl.getList())
                  {
-                       public void run()
+                       try
                          {
-                               // set thread local variable
-                               policySession.set(PolicySession.this);
-
-                               // We want to continue, despite any exceptions that occur
-                               // while rules are fired.
-                               boolean repeat = true;
-                               long minSleepTime = 100;
-                               long maxSleepTime = 5000;
-                               long sleepTime = maxSleepTime;
-                               while (repeat)
-                                 {
-                                       if(this.isInterrupted()){
-                                               break;
-                                       }
-                                       try
-                                         {
-                                               if (kieSession.fireAllRules() > 0)
-                                                 {
-                                                       // some rules fired -- reduce poll delay
-                                                       if (sleepTime > minSleepTime)
-                                                         {
-                                                               sleepTime /= 2;
-                                                               if (sleepTime < minSleepTime)
-                                                                 {
-                                                                       sleepTime = minSleepTime;
-                                                                 }
-                                                         }
-                                                 }
-                                               else
-                                                 {
-                                                       // no rules fired -- increase poll delay
-                                                       if (sleepTime < maxSleepTime)
-                                                         {
-                                                               sleepTime *= 2;
-                                                               if (sleepTime > maxSleepTime)
-                                                                 {
-                                                                       sleepTime = maxSleepTime;
-                                                                 }
-                                                         }
-                                                 }
-                                         }
-                                       catch (Throwable e)
-                                         {
-                                               logger.error(MessageCodes.EXCEPTION_ERROR, e, "startThread", "kieSession.fireUntilHalt");                                                       
-                                         }
-                                       try {
-                                               Thread.sleep(sleepTime);
-                                       } catch (InterruptedException e) {
-                                               break;
-                                       }
-                                 }
-                               logger.info("fireUntilHalt() returned");
+                               if ((threadModel = feature.selectThreadModel(this)) != null)
+                                 break;
                          }
-                 };
-               thread.start();
+                       catch (Exception e)
+                         {
+                               logger.error("ERROR: Feature API: "
+                                                        + feature.getClass().getName(), e);
+                         }
+                 }             
+               if (threadModel == null)
+                 {
+                       // no feature created a ThreadModel -- select the default
+                       threadModel = new DefaultThreadModel(this);
+                 }
+               logger.info("starting ThreadModel for session " + getFullName());
+               threadModel.start();
          }
   }
 
   /**
-   * if a thread is currently running, this invokes 'KieSession.halt()' to
-   * stop it.
+   * If a 'ThreadModel' is currently running, this calls the 'stop()' method,
+   * and sets the 'threadModel' reference to 'null'.
    */
   public synchronized void stopThread()
   {
-       if (thread != null)
+       if (threadModel != null)
          {
-               // this should cause the thread to exit         
-               thread.interrupt();
-               try
-                 {
-                       // wait for the thread to stop
-                       thread.join();
-                 }
-               catch (Exception e)
-                 {
-                       logger.error(MessageCodes.EXCEPTION_ERROR, e, "stopThread", "thread.join");
-                 }
-               thread = null;
+               threadModel.stop();
+               threadModel = null;
          }
   }
 
+  /**
+   * Notification that 'updateToVersion' was called on the container
+   */
+  void updated()
+  {
+       if (threadModel != null)
+         {
+               // notify the 'ThreadModel', which may change one or more Thread names
+               threadModel.updated();
+         }
+  }
+
+  /**
+   * Set this 'PolicySession' instance as the one associated with the
+   * currently-running thread.
+   */
+  public void setPolicySession()
+  {
+       // this sets a 'ThreadLocal' variable
+       policySession.set(this);
+  }
+
   /**
    * @return the 'PolicySession' instance associated with the current thread
    *   (Note that this only works if the current thread is the one running
@@ -452,4 +429,148 @@ public class PolicySession
                                         + ": AgendaEventListener.objectUpdated(" + event + ")");
          }
   }
+
+  /* ============================================================ */
+
+  /**
+   * This interface helps support the ability for features to choose the
+   * thread or threads that processes the 'KieSession'.
+   */
+  public interface ThreadModel
+  {
+       /**
+        * Start the thread or threads that do the 'KieSession' processing
+        */
+       public void start();
+
+       /**
+        * Stop the thread or threads that do the 'KieSession' processing
+        */
+       public void stop();
+
+       /**
+        * This method is called to notify the running session that
+        * 'KieContainer.updateToVersion(...)' has been called (meaning the
+        * full name of this session has changed).
+        */
+       default public void updated() {}
+  }
+
+  /* ============================================================ */
+
+  /**
+   * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
+   */
+  public static class DefaultThreadModel implements Runnable,ThreadModel
+  {
+       // session associated with this persistent thread
+       PolicySession session;
+
+       // the session thread
+       Thread thread;
+
+       // controls whether the thread loops or terminates
+       volatile boolean repeat = true;
+
+       /**
+        * Constructor - initialize 'session' and create thread
+        *
+        * @param session the 'PolicySession' instance
+        */
+       public DefaultThreadModel(PolicySession session)
+       {
+         this.session = session;
+         thread = new Thread(this,getThreadName());
+       }
+
+       /**
+        * @return the String to use as the thread name
+        */
+       private String getThreadName()
+       {
+         return("Session " + session.getFullName());
+       }
+
+       /***************************/
+       /* 'ThreadModel' interface */
+       /***************************/
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+         public void start()
+       {
+         repeat = true;
+         thread.start();
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+         public void stop()
+       {
+         repeat = false;
+
+         // this should cause the thread to exit               
+         session.getKieSession().halt();
+         try
+               {
+                 // wait up to 10 seconds for the thread to stop
+                 thread.join(10000);
+
+                 // one more interrupt, just in case the 'kieSession.halt()'
+                 // didn't work for some reason
+                 thread.interrupt();
+               }
+         catch (Exception e)
+               {
+                 logger.error(MessageCodes.EXCEPTION_ERROR, e, "stopThread", "thread.join");
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+         public void updated()
+       {
+         // the container artifact has been updated -- adjust the thread name
+         thread.setName(getThreadName());
+       }
+
+       /************************/
+       /* 'Runnable' interface */
+       /************************/
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+         public void run()
+       {
+         // set thread local variable
+         session.setPolicySession();
+
+         // We want to continue looping, despite any exceptions that occur
+         // while rules are fired.
+         KieSession kieSession = session.getKieSession();
+         while (repeat)
+               {
+                 try
+                       {
+                         kieSession.fireUntilHalt();
+
+                         // if we fall through, it means 'KieSession.halt()' was called
+                         repeat = false;
+                       }
+                 catch (Throwable e)
+                       {
+                         logger.error(MessageCodes.EXCEPTION_ERROR, e, "startThread", "kieSession.fireUntilHalt");                                             
+                       }
+               }
+         logger.info("fireUntilHalt() returned");
+       }
+  }
 }
index da828db..8792372 100644 (file)
@@ -79,6 +79,16 @@ public interface PolicySessionFeatureAPI extends OrderedService
    */
   default public void newPolicySession(PolicySession policySession) {}
 
+  /**
+   * This method is called to select the 'ThreadModel' instance associated
+   * with a 'PolicySession' instance.
+   */
+  default public PolicySession.ThreadModel selectThreadModel
+       (PolicySession session)
+  {
+       return(null);
+  }
+
   /**
    * This method is called after 'KieSession.dispose()' is called
    *