Merge "Close old UEB/DMaaP consumer"
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingFeature.java
index 2bec457..1e2071a 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import org.onap.policy.common.utils.properties.exception.PropertyException;
 import org.onap.policy.drools.controller.DroolsController;
@@ -67,7 +68,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
     /**
      * Maps a controller name to its associated manager.
      */
-    private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+    private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+    /**
+     * Decremented each time a manager enters the Active state. Used by junit tests.
+     */
+    private final CountDownLatch activeLatch = new CountDownLatch(1);
 
     /**
      * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
@@ -102,6 +108,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
         return host;
     }
 
+    /**
+     * @return a latch that will be decremented when a manager enters the active state
+     */
+    protected CountDownLatch getActiveLatch() {
+        return activeLatch;
+    }
+
     @Override
     public int getSequenceNumber() {
         return 0;
@@ -135,7 +148,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
                 PoolingProperties props = new PoolingProperties(name, featProps);
 
                 logger.info("pooling enabled for {}", name);
-                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props));
+                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch));
 
             } catch (PropertyException e) {
                 logger.error("pooling disabled due to exception for {}", name, e);
@@ -386,10 +399,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
          * @param host name/uuid of this host
          * @param controller
          * @param props properties to use to configure the manager
+         * @param activeLatch decremented when the manager goes Active
          * @return a new pooling manager
          */
-        public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) {
-            return new PoolingManagerImpl(host, controller, props);
+        public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+                        CountDownLatch activeLatch) {
+            return new PoolingManagerImpl(host, controller, props, activeLatch);
         }
 
         /**