Add comprehensive junit tests for pooling feature 39/43439/8
authorJim Hahn <jrh3@att.com>
Tue, 17 Apr 2018 22:00:40 +0000 (18:00 -0400)
committerJorge Hernandez <jh1730@att.com>
Fri, 20 Apr 2018 17:23:11 +0000 (17:23 +0000)
Added FeatureTest.java and FeatureTest2.java.
Fixed a number of issues that were identified via the above
tests.
Moved fixes of policy-endpoints to other JIRA tickets.
Resolved potential sonar issue with commented-out code by
making STD_XXX constants into plain variables to they could be
changed via a method call.
Changed stdEventWaitMs to stdEventWaitSec.

Change-Id: Icf8fcc9b0dfe2578aa0787e0c9224a6f76a068ee
Issue-ID: POLICY-748
Signed-off-by: Jim Hahn <jrh3@att.com>
25 files changed:
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java [new file with mode: 0644]
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java

index 102eda7..abb4da3 100644 (file)
@@ -202,6 +202,7 @@ public class DmaapManager {
 
         } catch (InterruptedException e) {
             logger.warn("message transmission stopped due to {}", e.getMessage());
+            Thread.currentThread().interrupt();
         }
 
         try {
index 21cbc4d..2bec457 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.policy.drools.pooling;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import org.onap.policy.common.utils.properties.exception.PropertyException;
@@ -53,6 +54,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
      */
     private static Factory factory;
 
+    /**
+     * ID of this host.
+     */
+    private final String host;
+
     /**
      * Entire set of feature properties, including those specific to various controllers.
      */
@@ -75,6 +81,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
      */
     public PoolingFeature() {
         super();
+
+        this.host = UUID.randomUUID().toString();
     }
 
     protected static Factory getFactory() {
@@ -90,6 +98,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
         PoolingFeature.factory = factory;
     }
 
+    public String getHost() {
+        return host;
+    }
+
     @Override
     public int getSequenceNumber() {
         return 0;
@@ -123,7 +135,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
                 PoolingProperties props = new PoolingProperties(name, featProps);
 
                 logger.info("pooling enabled for {}", name);
-                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
+                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props));
 
             } catch (PropertyException e) {
                 logger.error("pooling disabled due to exception for {}", name, e);
@@ -371,12 +383,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
         /**
          * Makes a pooling manager for a controller.
          * 
+         * @param host name/uuid of this host
          * @param controller
          * @param props properties to use to configure the manager
          * @return a new pooling manager
          */
-        public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
-            return new PoolingManagerImpl(controller, props);
+        public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) {
+            return new PoolingManagerImpl(host, controller, props);
         }
 
         /**
index 422efdd..d231246 100644 (file)
@@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import org.onap.policy.drools.controller.DroolsController;
 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.drools.event.comm.TopicListener;
@@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState;
 import org.onap.policy.drools.pooling.state.StartState;
 import org.onap.policy.drools.pooling.state.State;
 import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.properties.PolicyProperties;
 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
 import org.onap.policy.drools.system.PolicyController;
 import org.slf4j.Logger;
@@ -74,11 +73,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
      */
     private static Factory factory = new Factory();
 
-    /**
-     * ID of the last host that was created.
-     */
-    private static final AtomicReference<String> lastHost = new AtomicReference<>(null);
-
     /**
      * ID of this host.
      */
@@ -158,28 +152,24 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
 
     /**
      * Constructs the manager, initializing all of the data structures.
-     * 
+     *
+     * @param host name/uuid of this host
      * @param controller controller with which this is associated
      * @param props feature properties specific to the controller
      */
-    public PoolingManagerImpl(PolicyController controller, PoolingProperties props) {
-        this.host = UUID.randomUUID().toString();
+    public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) {
+        this.host = host;
         this.controller = controller;
         this.props = props;
 
-        lastHost.set(this.host);
-
         try {
             this.listener = (TopicListener) controller;
             this.serializer = new Serializer();
             this.topic = props.getPoolingTopic();
             this.eventq = factory.makeEventQueue(props);
-
-            SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(),
-                            props.getSource());
-            this.extractors = factory.makeClassExtractors(spec);
-
-            this.dmaapMgr = factory.makeDmaapManager(props);
+            this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
+            this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
+                            makeDmaapProps(controller, props.getSource()));
             this.current = new IdleState(this);
 
             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -202,16 +192,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         PoolingManagerImpl.factory = factory;
     }
 
-    /**
-     * Used by junit tests.
-     * 
-     * @return the ID of the last host that was created, or {@code null} if no hosts have
-     *         been created yet
-     */
-    protected static String getLastHost() {
-        return lastHost.get();
-    }
-
     /**
      * Should only be used by junit tests.
      * 
@@ -236,6 +216,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         return props;
     }
 
+    /**
+     * Makes properties for configuring extractors.
+     * 
+     * @param controller the controller for which the extractors will be configured
+     * @param source properties from which to get the extractor properties
+     * @return extractor properties
+     */
+    private Properties makeExtractorProps(PolicyController controller, Properties source) {
+        return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
+    }
+
+    /**
+     * Makes properties for configuring DMaaP. Copies properties from the source that
+     * start with the Pooling property prefix followed by the controller name, stripping
+     * the prefix and controller name.
+     * 
+     * @param controller the controller for which DMaaP will be configured
+     * @param source properties from which to get the DMaaP properties
+     * @return DMaaP properties
+     */
+    private Properties makeDmaapProps(PolicyController controller, Properties source) {
+        SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source);
+
+        // could be UEB or DMAAP, so add both
+        addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+        addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+
+        return props;
+    }
+
+    /**
+     * Adds DMaaP consumer properties, consumer group & instance. The group is the host
+     * and the instance is a constant.
+     * 
+     * @param props where to add the new properties
+     * @param prefix property prefix
+     */
+    private void addDmaapConsumerProps(SpecProperties props, String prefix) {
+        String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
+        
+        props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
+        props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
+    }
+
     /**
      * Indicates that the controller is about to start. Starts the publisher for the
      * internal topic, and creates a thread pool for the timers.
@@ -288,8 +312,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
             scheduler = null;
 
             if (!(current instanceof IdleState)) {
-                dmaapMgr.stopConsumer(this);
                 changeState(new IdleState(this));
+                dmaapMgr.stopConsumer(this);
                 publishAdmin(new Offline(getHost()));
             }
         }
@@ -751,26 +775,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
 
     @Override
     public CountDownLatch startDistributing(BucketAssignments asgn) {
-        if (asgn == null) {
-            return null;
-        }
-
-        logger.info("new assignments for topic {}", getTopic());
-
         synchronized (curLocker) {
+            int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
+            logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
             assignments = asgn;
         }
 
+        if (asgn == null) {
+            return null;
+        }
+
         /*
          * publish the events from the event queue, but do it in a background thread so
          * that the state machine can enter its correct state BEFORE we start processing
          * the events
          */
         CountDownLatch latch = new CountDownLatch(1);
-        
+
         new Thread(() -> {
             synchronized (curLocker) {
                 if (assignments == null) {
+                    latch.countDown();
                     return;
                 }
 
@@ -779,11 +804,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
                 while ((ev = eventq.poll()) != null) {
                     handle(ev);
                 }
-                
+
                 latch.countDown();
             }
         }).start();
-        
+
         return latch;
     }
 
@@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         /**
          * Creates a DMaaP manager.
          * 
-         * @param props properties used to configure the manager
+         * @param topic name of the internal DMaaP topic
+         * @param props properties used to configure DMaaP
          * @return a new DMaaP manager
          * @throws PoolingFeatureException if an error occurs
          */
-        public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException {
-            return new DmaapManager(props.getPoolingTopic(), props.getSource());
+        public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
+            return new DmaapManager(topic, props);
         }
 
         /**
index 4e8de0d..5431942 100644 (file)
@@ -51,12 +51,12 @@ public class PoolingProperties extends SpecPropertyConfiguration {
     public static final String FEATURE_ENABLED = PREFIX + "{?.}enabled";
     public static final String OFFLINE_LIMIT = PREFIX + "{?.}offline.queue.limit";
     public static final String OFFLINE_AGE_MS = PREFIX + "{?.}offline.queue.age.milliseconds";
+    public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
     public static final String START_HEARTBEAT_MS = PREFIX + "{?.}start.heartbeat.milliseconds";
     public static final String REACTIVATE_MS = PREFIX + "{?.}reactivate.milliseconds";
     public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds";
     public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds";
     public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds";
-    public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
 
     /**
      * Type of item that the extractors will be extracting.
@@ -93,11 +93,18 @@ public class PoolingProperties extends SpecPropertyConfiguration {
     @Property(name = OFFLINE_AGE_MS, defaultValue = "60000")
     private long offlineAgeMs;
 
+    /**
+     * Time, in milliseconds, to wait for an "Offline" message to be published
+     * to DMaaP.
+     */
+    @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
+    private long offlinePubWaitMs;
+
     /**
      * Time, in milliseconds, to wait for this host's heart beat during the
      * start-up state.
      */
-    @Property(name = START_HEARTBEAT_MS, defaultValue = "50000")
+    @Property(name = START_HEARTBEAT_MS, defaultValue = "100000")
     private long startHeartbeatMs;
 
     /**
@@ -123,18 +130,11 @@ public class PoolingProperties extends SpecPropertyConfiguration {
 
     /**
      * Time, in milliseconds, to wait between heart beat generations during
-     * the active state.
+     * the active and start-up states.
      */
     @Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000")
     private long interHeartbeatMs;
 
-    /**
-     * Time, in milliseconds, to wait for an "Offline" message to be published
-     * to DMaaP.
-     */
-    @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
-    private long offlinePubWaitMs;
-
     /**
      * @param controllerName the name of the controller
      * @param props set of properties used to configure this
@@ -163,6 +163,10 @@ public class PoolingProperties extends SpecPropertyConfiguration {
         return offlineAgeMs;
     }
 
+    public long getOfflinePubWaitMs() {
+        return offlinePubWaitMs;
+    }
+
     public long getStartHeartbeatMs() {
         return startHeartbeatMs;
     }
@@ -182,8 +186,4 @@ public class PoolingProperties extends SpecPropertyConfiguration {
     public long getInterHeartbeatMs() {
         return interHeartbeatMs;
     }
-
-    public long getOfflinePubWaitMs() {
-        return offlinePubWaitMs;
-    }
 }
index 31b4e61..b62ea0a 100644 (file)
@@ -40,7 +40,8 @@ public class SpecProperties extends Properties {
 
     /**
      * 
-     * @param prefix the property name prefix that appears before any specialization
+     * @param prefix the property name prefix that appears before any specialization, may
+     *        be ""
      * @param specialization the property name specialization (e.g., session name)
      */
     public SpecProperties(String prefix, String specialization) {
@@ -50,7 +51,8 @@ public class SpecProperties extends Properties {
 
     /**
      * 
-     * @param prefix the property name prefix that appears before any specialization
+     * @param prefix the property name prefix that appears before any specialization, may
+     *        be ""
      * @param specialization the property name specialization (e.g., session name)
      * @param props the default properties
      */
@@ -68,7 +70,7 @@ public class SpecProperties extends Properties {
      * @return the text, with a trailing "."
      */
     private static String withTrailingDot(String text) {
-        return text.endsWith(".") ? text : text + ".";
+        return text.isEmpty() || text.endsWith(".") ? text : text + ".";
     }
 
     /**
index 2752ca8..ccca77b 100644 (file)
@@ -374,7 +374,7 @@ public class ClassExtractors {
 
             } catch (NoSuchMethodException expected) {
                 // no getXxx() method, maybe there's a field by this name
-                logger.debug("no method {} in {}", nm, clazz.getName());
+                logger.debug("no method {} in {}", nm, clazz.getName(), expected);
                 return null;
 
             } catch (SecurityException e) {
@@ -445,7 +445,7 @@ public class ClassExtractors {
 
             } catch (NoSuchFieldException expected) {
                 // no field by this name - try super class & interfaces
-                logger.debug("no field {} in {}", name, clazz.getName());
+                logger.debug("no field {} in {}", name, clazz.getName(), expected);
 
             } catch (SecurityException e) {
                 throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
index b0a36cd..8f0a902 100644 (file)
@@ -95,18 +95,19 @@ public class ActiveState extends ProcessingState {
         if ((succHost = assigned.higher(getHost())) == null) {
             // wrapped around - successor is the first host in the set
             succHost = assigned.first();
-            logger.info("this host's successor is {} on topic {}", succHost, getTopic());
         }
+        logger.info("this host's successor is {} on topic {}", succHost, getTopic());        
 
         if ((predHost = assigned.lower(getHost())) == null) {
             // wrapped around - predecessor is the last host in the set
             predHost = assigned.last();
-            logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
         }
+        logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
     }
 
     @Override
     public void start() {
+        super.start();
         addTimers();
         genHeartbeat();
     }
@@ -120,7 +121,7 @@ public class ActiveState extends ProcessingState {
         /*
          * heart beat generator
          */
-        long genMs = getProperties().getActiveHeartbeatMs();
+        long genMs = getProperties().getInterHeartbeatMs();
 
         scheduleWithFixedDelay(genMs, genMs, () -> {
             genHeartbeat();
@@ -130,9 +131,9 @@ public class ActiveState extends ProcessingState {
         /*
          * my heart beat checker
          */
-        long interMs = getProperties().getInterHeartbeatMs();
+        long waitMs = getProperties().getActiveHeartbeatMs();
 
-        scheduleWithFixedDelay(interMs, interMs, () -> {
+        scheduleWithFixedDelay(waitMs, waitMs, () -> {
             if (myHeartbeatSeen) {
                 myHeartbeatSeen = false;
                 return null;
@@ -141,7 +142,7 @@ public class ActiveState extends ProcessingState {
             // missed my heart beat
             logger.error("missed my heartbeat on topic {}", getTopic());
 
-            return internalTopicFailed();
+            return missedHeartbeat();
         });
 
         /*
@@ -149,7 +150,7 @@ public class ActiveState extends ProcessingState {
          */
         if (!predHost.isEmpty()) {
 
-            scheduleWithFixedDelay(interMs, interMs, () -> {
+            scheduleWithFixedDelay(waitMs, waitMs, () -> {
                 if (predHeartbeatSeen) {
                     predHeartbeatSeen = false;
                     return null;
index 6be2fb8..da04425 100644 (file)
@@ -44,24 +44,17 @@ public class InactiveState extends State {
 
     @Override
     public void start() {
-
         super.start();
-
         schedule(getProperties().getReactivateMs(), () -> goStart());
     }
 
     @Override
     public State process(Leader msg) {
-        if(isValid(msg)) {
+        if (isValid(msg)) {
             logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic());
-            startDistributing(msg.getAssignments());
-            
-            if(msg.getAssignments().hasAssignment(getHost())) {
-                logger.info("received Leader message on topic {}", getTopic());
-                return goActive();
-            }
+            return goActive(msg.getAssignments());
         }
-        
+
         return null;
     }
 
index 1e9bb58..e9dc032 100644 (file)
@@ -73,24 +73,6 @@ public class ProcessingState extends State {
         this.leader = leader;
     }
 
-    /**
-     * Goes active with a new set of assignments.
-     * 
-     * @param asgn new assignments
-     * @return the new state, either Active or Inactive, depending on whether or not this
-     *         host has an assignment
-     */
-    protected State goActive(BucketAssignments asgn) {
-        startDistributing(asgn);
-
-        if (asgn.hasAssignment(getHost())) {
-            return goActive();
-
-        } else {
-            return goInactive();
-        }
-    }
-
     /**
      * Generates an Identification message and goes to the query state.
      */
@@ -154,11 +136,11 @@ public class ProcessingState extends State {
         }
 
         Leader msg = makeLeader(alive);
-        publish(msg);
+        logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
 
-        setAssignments(msg.getAssignments());
+        publish(msg);
 
-        return goActive();
+        return goActive(msg.getAssignments());
     }
 
     /**
index 9045165..1a4da15 100644 (file)
@@ -63,7 +63,6 @@ public class QueryState extends ProcessingState {
 
     @Override
     public void start() {
-
         super.start();
 
         // start identification timer
@@ -85,39 +84,21 @@ public class QueryState extends ProcessingState {
             if (!sawSelfIdent) {
                 // didn't see our identification
                 logger.error("missed our own Ident message on topic {}", getTopic());
-                return internalTopicFailed();
+                return missedHeartbeat();
 
             } else if (isLeader()) {
                 // "this" host is the new leader
                 logger.info("this host is the new leader for topic {}", getTopic());
                 return becomeLeader(alive);
 
-            } else if (hasAssignment()) {
-                /*
-                 * this host is not the new leader, but it does have an assignment -
-                 * return to the active state while we wait for the leader
-                 */
-                logger.info("no new leader on topic {}", getTopic());
-                return goActive();
-
             } else {
-                // not the leader and no assignment yet
+                // not the leader - return to previous state
                 logger.info("no new leader on topic {}", getTopic());
-                return goInactive();
+                return goActive(getAssignments());
             }
         });
     }
 
-    /**
-     * Determines if this host has an assignment in the CURRENT assignments.
-     * 
-     * @return {@code true} if this host has an assignment, {@code false} otherwise
-     */
-    protected boolean hasAssignment() {
-        BucketAssignments asgn = getAssignments();
-        return (asgn != null && asgn.hasAssignment(getHost()));
-    }
-
     @Override
     public State goQuery() {
         return null;
index a978e24..3068cfc 100644 (file)
@@ -67,8 +67,22 @@ public class StartState extends State {
 
         super.start();
 
-        publish(getHost(), makeHeartbeat(hbTimestampMs));
+        Heartbeat hb = makeHeartbeat(hbTimestampMs);
+        publish(getHost(), hb);
 
+        /*
+         * heart beat generator
+         */
+        long genMs = getProperties().getInterHeartbeatMs();
+
+        scheduleWithFixedDelay(genMs, genMs, () -> {
+            publish(getHost(), hb);
+            return null;
+        });
+
+        /*
+         * my heart beat checker
+         */
         schedule(getProperties().getStartHeartbeatMs(), () -> {
             logger.error("missed heartbeat on topic {}", getTopic());
             return internalTopicFailed();
index 421b922..54e9323 100644 (file)
@@ -113,12 +113,21 @@ public abstract class State {
     }
 
     /**
-     * Transitions to the "active" state.
+     * Goes active with a new set of assignments.
      * 
-     * @return the new state
+     * @param asgn new assignments
+     * @return the new state, either Active or Inactive, depending on whether or not this
+     *         host has an assignment
      */
-    public final State goActive() {
-        return mgr.goActive();
+    protected State goActive(BucketAssignments asgn) {
+        startDistributing(asgn);
+
+        if (asgn.hasAssignment(getHost())) {
+            return mgr.goActive();
+
+        } else {
+            return goInactive();
+        }
     }
 
     /**
@@ -322,7 +331,21 @@ public abstract class State {
     }
 
     /**
-     * Indicates that the internal topic failed.
+     * Indicates that we failed to see our own heartbeat; must be a problem with the
+     * internal topic.
+     * 
+     * @return a new {@link StartState}
+     */
+    protected final State missedHeartbeat() {
+        publish(makeOffline());
+        mgr.startDistributing(null);
+
+        return mgr.goStart();
+    }
+
+    /**
+     * Indicates that the internal topic failed; this should only be invoked from the
+     * StartState.
      * 
      * @return a new {@link InactiveState}
      */
index a91671f..29dc15e 100644 (file)
@@ -211,8 +211,8 @@ public class DmaapManagerTest {
         // force exception when it starts
         doThrow(new IllegalStateException("expected")).when(sink).start();
 
-        expectException("startPublisher,start", xxx -> mgr.startPublisher());
-        expectException("startPublisher,publish", xxx -> mgr.publish(MSG));
+        expectException("startPublisher,start", () -> mgr.startPublisher());
+        expectException("startPublisher,publish", () -> mgr.publish(MSG));
 
         // allow it to succeed this time
         reset(sink);
@@ -336,7 +336,7 @@ public class DmaapManagerTest {
     @Test
     public void testPublish() throws PoolingFeatureException {
         // cannot publish before starting
-        expectException("publish,pre", xxx -> mgr.publish(MSG));
+        expectException("publish,pre", () -> mgr.publish(MSG));
 
         mgr.startPublisher();
 
@@ -352,7 +352,7 @@ public class DmaapManagerTest {
 
         // stop and verify we can no longer publish
         mgr.stopPublisher(0);
-        expectException("publish,stopped", xxx -> mgr.publish(MSG));
+        expectException("publish,stopped", () -> mgr.publish(MSG));
     }
 
     @Test(expected = PoolingFeatureException.class)
@@ -377,7 +377,7 @@ public class DmaapManagerTest {
 
     private void expectException(String testnm, VFunction func) {
         try {
-            func.apply(null);
+            func.apply();
             fail(testnm + " missing exception");
 
         } catch (PoolingFeatureException expected) {
@@ -387,6 +387,6 @@ public class DmaapManagerTest {
 
     @FunctionalInterface
     public static interface VFunction {
-        public void apply(Void arg) throws PoolingFeatureException;
+        public void apply() throws PoolingFeatureException;
     }
 }
index 13d70f5..cc58838 100644 (file)
@@ -26,13 +26,13 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -43,13 +43,11 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -71,7 +69,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
- * its own feature object.
+ * its own feature object. Uses real feature objects. However, the following are not:
+ * <dl>
+ * <dt>DMaaP sources and sinks</dt>
+ * <dd>simulated using queues. There is one queue for the external topic, and one queue
+ * for each host's internal topic. Messages published to the "admin" channel are simply
+ * sent to all of the hosts' internal topic queues</dd>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
  */
 public class FeatureTest {
 
@@ -92,26 +98,15 @@ public class FeatureTest {
      */
     private static final String CONTROLLER1 = "controller.one";
 
-    // private static final long STD_HEARTBEAT_WAIT_MS = 100;
-    // private static final long STD_REACTIVATE_WAIT_MS = 200;
-    // private static final long STD_IDENTIFICATION_MS = 60;
-    // private static final long STD_ACTIVE_HEARTBEAT_MS = 5;
-    // private static final long STD_INTER_HEARTBEAT_MS = 50;
-    // private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
-    // private static final long POLL_MS = 2;
-    // private static final long INTER_POLL_MS = 2;
-    // private static final long EVENT_WAIT_SEC = 5;
-
-    // use these to slow things down
-    private static final long STD_HEARTBEAT_WAIT_MS = 5000;
-    private static final long STD_REACTIVATE_WAIT_MS = 10000;
-    private static final long STD_IDENTIFICATION_MS = 10000;
-    private static final long STD_ACTIVE_HEARTBEAT_MS = 5000;
-    private static final long STD_INTER_HEARTBEAT_MS = 12000;
-    private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
-    private static final long POLL_MS = 2;
-    private static final long INTER_POLL_MS = 2000;
-    private static final long EVENT_WAIT_SEC = 1000;
+    private static long stdReactivateWaitMs = 200;
+    private static long stdIdentificationMs = 60;
+    private static long stdStartHeartbeatMs = 60;
+    private static long stdActiveHeartbeatMs = 50;
+    private static long stdInterHeartbeatMs = 5;
+    private static long stdOfflinePubWaitMs = 2;
+    private static long stdPollMs = 2;
+    private static long stdInterPollMs = 2;
+    private static long stdEventWaitSec = 10;
 
     // these are saved and restored on exit from this test class
     private static PoolingFeature.Factory saveFeatureFactory;
@@ -128,6 +123,8 @@ public class FeatureTest {
         saveFeatureFactory = PoolingFeature.getFactory();
         saveManagerFactory = PoolingManagerImpl.getFactory();
         saveDmaapFactory = DmaapManager.getFactory();
+        
+        // note: invoke runSlow() to slow things down
     }
 
     @AfterClass
@@ -149,51 +146,35 @@ public class FeatureTest {
         }
     }
 
-    @Ignore
     @Test
     public void test_SingleHost() throws Exception {
-        int nmessages = 70;
-
-        ctx = new Context(nmessages);
-
-        ctx.addHost();
-        ctx.startHosts();
-
-        for (int x = 0; x < nmessages; ++x) {
-            ctx.offerExternal(makeMessage(x));
-        }
-
-        ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(0, ctx.getDecodeErrors());
-        assertEquals(0, ctx.getRemainingEvents());
-        ctx.checkAllSawAMsg();
+        run(70, 1);
     }
 
-    @Ignore
     @Test
     public void test_TwoHosts() throws Exception {
-        int nmessages = 200;
+        run(200, 2);
+    }
+
+    @Test
+    public void test_ThreeHosts() throws Exception {
+        run(200, 3);
+    }
 
+    private void run(int nmessages, int nhosts) throws Exception {
         ctx = new Context(nmessages);
 
-        ctx.addHost();
-        ctx.addHost();
+        for (int x = 0; x < nhosts; ++x) {
+            ctx.addHost();
+        }
+
         ctx.startHosts();
 
         for (int x = 0; x < nmessages; ++x) {
             ctx.offerExternal(makeMessage(x));
         }
 
-        // wait for all hosts to have time to process a few messages
-        Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3);
-
-        // pause a topic for a bit
-//        ctx.pauseTopic();
-
-        // now we'll see if it recovers
-
-        ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+        ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
 
         assertEquals(0, ctx.getDecodeErrors());
         assertEquals(0, ctx.getRemainingEvents());
@@ -203,6 +184,21 @@ public class FeatureTest {
     private String makeMessage(int reqnum) {
         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
     }
+    
+    /**
+     * Invoke this to slow the timers down.
+     */
+    protected static void runSlow() {
+         stdReactivateWaitMs = 10000;
+         stdIdentificationMs = 10000;
+         stdStartHeartbeatMs = 15000;
+         stdActiveHeartbeatMs = 12000;
+         stdInterHeartbeatMs = 5000;
+         stdOfflinePubWaitMs = 2;
+         stdPollMs = 2;
+         stdInterPollMs = 2000;
+         stdEventWaitSec = 1000;
+    }
 
     /**
      * Context used for a single test case.
@@ -243,12 +239,6 @@ public class FeatureTest {
          */
         private final CountDownLatch eventCounter;
 
-        /**
-         * Maps host name to its topic source. This must be in sorted order so we can
-         * identify the source for the host with the higher name.
-         */
-        private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>();
-
         /**
          * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
          * {@link #getCurrentHost()}.
@@ -280,9 +270,14 @@ public class FeatureTest {
 
         /**
          * Creates and adds a new host to the context.
+         * 
+         * @return the new Host
          */
-        public void addHost() {
-            hosts.add(new Host(this));
+        public Host addHost() {
+            Host host = new Host(this);
+            hosts.add(host);
+
+            return host;
         }
 
         /**
@@ -442,26 +437,6 @@ public class FeatureTest {
             return eventCounter.await(time, units);
         }
 
-        /**
-         * Associates a host with a topic.
-         * 
-         * @param host
-         * @param topic
-         */
-        public void addTopicSource(String host, TopicSourceImpl topic) {
-            host2topic.put(host, topic);
-        }
-
-        /**
-         * Pauses the last topic source long enough to miss a heart beat.
-         */
-        public void pauseTopic() {
-            Entry<String, TopicSourceImpl> ent = host2topic.lastEntry();
-            if (ent != null) {
-                ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS);
-            }
-        }
-
         /**
          * Gets the current host, provided this is used from within a call to
          * {@link #withHost(Host, VoidFunction)}.
@@ -527,12 +502,10 @@ public class FeatureTest {
         }
 
         /**
-         * Gets the host name. This should only be invoked within {@link #start()}.
-         * 
          * @return the host name
          */
         public String getName() {
-            return PoolingManagerImpl.getLastHost();
+            return feature.getHost();
         }
 
         /**
@@ -758,11 +731,6 @@ public class FeatureTest {
          */
         private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
 
-        /**
-         * Time, in milliseconds, to pause before polling for more messages.
-         */
-        private AtomicLong pauseTimeMs = new AtomicLong(0);
-
         /**
          * 
          * @param context
@@ -771,12 +739,8 @@ public class FeatureTest {
          */
         public TopicSourceImpl(Context context, boolean internal) {
             if (internal) {
-                Host host = context.getCurrentHost();
-
                 this.topic = INTERNAL_TOPIC;
-                this.queue = host.getInternalQueue();
-
-                context.addTopicSource(host.getName(), this);
+                this.queue = context.getCurrentHost().getInternalQueue();
 
             } else {
                 this.topic = EXTERNAL_TOPIC;
@@ -809,11 +773,12 @@ public class FeatureTest {
 
             reregister(newPair);
 
-            new Thread(() -> {
+            Thread thread = new Thread(() -> {
+
                 try {
                     do {
                         processMessages(newPair.first(), listener);
-                    } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS));
+                    } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
 
                     logger.info("topic source thread completed");
 
@@ -827,7 +792,10 @@ public class FeatureTest {
 
                 newPair.second().countDown();
 
-            }).start();
+            });
+
+            thread.setDaemon(true);
+            thread.start();
         }
 
         /**
@@ -879,19 +847,7 @@ public class FeatureTest {
         }
 
         /**
-         * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should
-         * pause a bit.
-         * 
-         * @param timeMs time, in milliseconds, to pause
-         */
-        public void pause(long timeMs) {
-            pauseTimeMs.set(timeMs);
-        }
-
-        /**
-         * Polls for messages from the topic and offers them to the listener. If
-         * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and
-         * then immediately returns.
+         * Polls for messages from the topic and offers them to the listener.
          * 
          * @param stopped triggered if processing should stop
          * @param listener
@@ -901,14 +857,7 @@ public class FeatureTest {
 
             for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
 
-                long ptm = pauseTimeMs.getAndSet(0);
-                if (ptm != 0) {
-                    logger.warn("pause processing");
-                    stopped.await(ptm, TimeUnit.MILLISECONDS);
-                    return;
-                }
-
-                String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS);
+                String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
                 if (msg == null) {
                     return;
                 }
@@ -1038,18 +987,20 @@ public class FeatureTest {
 
             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
 
-            props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC);
-            props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true");
-            props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000");
-            props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000");
-            props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds",
-                            "" + STD_ACTIVE_HEARTBEAT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds",
-                            "" + STD_OFFLINE_PUB_WAIT_MS);
+            props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+            props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+            props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+                            "" + stdOfflinePubWaitMs);
+            props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+                            "" + stdStartHeartbeatMs);
+            props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
+            props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
+            props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+                            "" + stdActiveHeartbeatMs);
+            props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+                            "" + stdInterHeartbeatMs);
 
             return props;
         }
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
new file mode 100644 (file)
index 0000000..84449e7
--- /dev/null
@@ -0,0 +1,708 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicEndpoint;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.drools.properties.PolicyProperties;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
+ * its own feature object. Uses real feature objects, as well as real DMaaP sources and
+ * sinks. However, the following are not:
+ * <dl>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
+ * 
+ * <p>
+ * The following fields must be set before executing this:
+ * <ul>
+ * <li>UEB_SERVERS</li>
+ * <li>INTERNAL_TOPIC</li>
+ * <li>EXTERNAL_TOPIC</li>
+ * </ul>
+ */
+public class FeatureTest2 {
+
+    private static final Logger logger = LoggerFactory.getLogger(FeatureTest2.class);
+
+    /**
+     * UEB servers for both internal & external topics.
+     */
+    private static final String UEB_SERVERS = "";
+
+    /**
+     * Name of the topic used for inter-host communication.
+     */
+    private static final String INTERNAL_TOPIC = "";
+
+    /**
+     * Name of the topic from which "external" events "arrive".
+     */
+    private static final String EXTERNAL_TOPIC = "";
+
+    /**
+     * Consumer group to use when polling the external topic.
+     */
+    private static final String EXTERNAL_GROUP = FeatureTest2.class.getName();
+
+    /**
+     * Name of the controller.
+     */
+    private static final String CONTROLLER1 = "controller.one";
+
+    /**
+     * Maximum number of items to fetch from DMaaP in a single poll.
+     */
+    private static final String FETCH_LIMIT = "5";
+
+    private static final long STD_REACTIVATE_WAIT_MS = 10000;
+    private static final long STD_IDENTIFICATION_MS = 10000;
+    private static final long STD_START_HEARTBEAT_MS = 15000;
+    private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
+    private static final long STD_INTER_HEARTBEAT_MS = 5000;
+    private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+    private static final long EVENT_WAIT_SEC = 15;
+
+    // these are saved and restored on exit from this test class
+    private static PoolingFeature.Factory saveFeatureFactory;
+    private static PoolingManagerImpl.Factory saveManagerFactory;
+
+    /**
+     * Sink for external DMaaP topic.
+     */
+    private static TopicSink externalSink;
+
+    /**
+     * Context for the current test case.
+     */
+    private Context ctx;
+
+
+    @BeforeClass
+    public static void setUpBeforeClass() {
+        saveFeatureFactory = PoolingFeature.getFactory();
+        saveManagerFactory = PoolingManagerImpl.getFactory();
+
+        Properties props = makeSinkProperties(EXTERNAL_TOPIC);
+        externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
+        externalSink.start();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        PoolingFeature.setFactory(saveFeatureFactory);
+        PoolingManagerImpl.setFactory(saveManagerFactory);
+
+        externalSink.stop();
+    }
+
+    @Before
+    public void setUp() {
+        ctx = null;
+    }
+
+    @After
+    public void tearDown() {
+        if (ctx != null) {
+            ctx.destroy();
+        }
+    }
+
+    @Ignore
+    @Test
+    public void test_SingleHost() throws Exception {
+        run(70, 1);
+    }
+
+    @Ignore
+    @Test
+    public void test_TwoHosts() throws Exception {
+        run(200, 2);
+    }
+
+    @Ignore
+    @Test
+    public void test_ThreeHosts() throws Exception {
+        run(200, 3);
+    }
+
+    private void run(int nmessages, int nhosts) throws Exception {
+        ctx = new Context(nmessages);
+
+        for (int x = 0; x < nhosts; ++x) {
+            ctx.addHost();
+        }
+
+        ctx.startHosts();
+
+        ctx.awaitEvents(STD_IDENTIFICATION_MS * 2, TimeUnit.MILLISECONDS);
+
+        for (int x = 0; x < nmessages; ++x) {
+            ctx.offerExternal(makeMessage(x));
+        }
+
+        ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(0, ctx.getDecodeErrors());
+        assertEquals(0, ctx.getRemainingEvents());
+        ctx.checkAllSawAMsg();
+    }
+
+    private String makeMessage(int reqnum) {
+        return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
+    }
+
+    private static Properties makeSinkProperties(String topic) {
+        Properties props = new Properties();
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS, topic);
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+        return props;
+    }
+
+    private static Properties makeSourceProperties(String topic) {
+        Properties props = new Properties();
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+        if (EXTERNAL_TOPIC.equals(topic)) {
+            // consumer group is a constant
+            props.setProperty(
+                            PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                                            + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX,
+                            EXTERNAL_GROUP);
+
+            // consumer instance is generated by the BusConsumer code
+        }
+
+        // else internal topic: feature populates info for internal topic
+
+        return props;
+    }
+
+    /**
+     * Context used for a single test case.
+     */
+    private static class Context {
+
+        private final FeatureFactory featureFactory;
+        private final ManagerFactory managerFactory;
+
+        /**
+         * Hosts that have been added to this context.
+         */
+        private final Deque<Host> hosts = new LinkedList<>();
+
+        /**
+         * Maps a drools controller to its policy controller.
+         */
+        private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
+
+        /**
+         * Counts the number of decode errors.
+         */
+        private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
+
+        /**
+         * Number of events we're still waiting to receive.
+         */
+        private final CountDownLatch eventCounter;
+
+        /**
+         * 
+         * @param nEvents number of events to be processed
+         */
+        public Context(int nEvents) {
+            featureFactory = new FeatureFactory(this);
+            managerFactory = new ManagerFactory(this);
+            eventCounter = new CountDownLatch(nEvents);
+
+            PoolingFeature.setFactory(featureFactory);
+            PoolingManagerImpl.setFactory(managerFactory);
+        }
+
+        /**
+         * Destroys the context, stopping any hosts that remain.
+         */
+        public void destroy() {
+            stopHosts();
+            hosts.clear();
+        }
+
+        /**
+         * Creates and adds a new host to the context.
+         * 
+         * @return the new Host
+         */
+        public Host addHost() {
+            Host host = new Host(this);
+            hosts.add(host);
+
+            return host;
+        }
+
+        /**
+         * Starts the hosts.
+         */
+        public void startHosts() {
+            hosts.forEach(host -> host.start());
+        }
+
+        /**
+         * Stops the hosts.
+         */
+        public void stopHosts() {
+            hosts.forEach(host -> host.stop());
+        }
+
+        /**
+         * Verifies that all hosts processed at least one message.
+         */
+        public void checkAllSawAMsg() {
+            int x = 0;
+            for (Host host : hosts) {
+                assertTrue("x=" + x, host.messageSeen());
+                ++x;
+            }
+        }
+
+        /**
+         * Offers an event to the external topic.
+         * 
+         * @param event
+         */
+        public void offerExternal(String event) {
+            externalSink.send(event);
+        }
+
+        /**
+         * Decodes an event.
+         * 
+         * @param event
+         * @return the decoded event, or {@code null} if it cannot be decoded
+         */
+        public Object decodeEvent(String event) {
+            return managerFactory.decodeEvent(null, null, event);
+        }
+
+        /**
+         * Associates a controller with its drools controller.
+         * 
+         * @param controller
+         * @param droolsController
+         */
+        public void addController(PolicyController controller, DroolsController droolsController) {
+            drools2policy.put(droolsController, controller);
+        }
+
+        /**
+         * @param droolsController
+         * @return the controller associated with a drools controller, or {@code null} if
+         *         it has no associated controller
+         */
+        public PolicyController getController(DroolsController droolsController) {
+            return drools2policy.get(droolsController);
+        }
+
+        /**
+         * 
+         * @return the number of decode errors so far
+         */
+        public int getDecodeErrors() {
+            return nDecodeErrors.get();
+        }
+
+        /**
+         * Increments the count of decode errors.
+         */
+        public void bumpDecodeErrors() {
+            nDecodeErrors.incrementAndGet();
+        }
+
+        /**
+         * 
+         * @return the number of events that haven't been processed
+         */
+        public long getRemainingEvents() {
+            return eventCounter.getCount();
+        }
+
+        /**
+         * Adds an event to the counter.
+         */
+        public void addEvent() {
+            eventCounter.countDown();
+        }
+
+        /**
+         * Waits, for a period of time, for all events to be processed.
+         * 
+         * @param time
+         * @param units
+         * @return {@code true} if all events have been processed, {@code false} otherwise
+         * @throws InterruptedException
+         */
+        public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
+            return eventCounter.await(time, units);
+        }
+    }
+
+    /**
+     * Simulates a single "host".
+     */
+    private static class Host {
+
+        private final PoolingFeature feature = new PoolingFeature();
+
+        /**
+         * {@code True} if this host has processed a message, {@code false} otherwise.
+         */
+        private final AtomicBoolean sawMsg = new AtomicBoolean(false);
+
+        private final TopicSource externalSource;
+
+        // mock objects
+        private final PolicyEngine engine = mock(PolicyEngine.class);
+        private final ListenerController controller = mock(ListenerController.class);
+        private final DroolsController drools = mock(DroolsController.class);
+
+        /**
+         * 
+         * @param context
+         */
+        public Host(Context context) {
+
+            when(controller.getName()).thenReturn(CONTROLLER1);
+            when(controller.getDrools()).thenReturn(drools);
+
+            Properties props = makeSourceProperties(EXTERNAL_TOPIC);
+            externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
+
+            // stop consuming events if the controller stops
+            when(controller.stop()).thenAnswer(args -> {
+                externalSource.unregister(controller);
+                return true;
+            });
+
+            doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
+
+            context.addController(controller, drools);
+        }
+
+        /**
+         * Starts threads for the host so that it begins consuming from both the external
+         * "DMaaP" topic and its own internal "DMaaP" topic.
+         */
+        public void start() {
+            feature.beforeStart(engine);
+            feature.afterCreate(controller);
+
+            feature.beforeStart(controller);
+
+            // start consuming events from the external topic
+            externalSource.register(controller);
+
+            feature.afterStart(controller);
+        }
+
+        /**
+         * Stops the host's threads.
+         */
+        public void stop() {
+            feature.beforeStop(controller);
+            externalSource.unregister(controller);
+            feature.afterStop(controller);
+        }
+
+        /**
+         * Offers an event to the feature, before the policy controller handles it.
+         * 
+         * @param protocol
+         * @param topic2
+         * @param event
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+            return feature.beforeOffer(controller, protocol, topic2, event);
+        }
+
+        /**
+         * Offers an event to the feature, after the policy controller handles it.
+         * 
+         * @param protocol
+         * @param topic
+         * @param event
+         * @param success
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+
+            return feature.afterOffer(controller, protocol, topic, event, success);
+        }
+
+        /**
+         * Offers an event to the feature, before the drools controller handles it.
+         * 
+         * @param fact
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean beforeInsert(Object fact) {
+            return feature.beforeInsert(drools, fact);
+        }
+
+        /**
+         * Offers an event to the feature, after the drools controller handles it.
+         * 
+         * @param fact
+         * @param successInsert {@code true} if it was successfully inserted by the drools
+         *        controller, {@code false} otherwise
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean afterInsert(Object fact, boolean successInsert) {
+            return feature.afterInsert(drools, fact, successInsert);
+        }
+
+        /**
+         * Indicates that a message was seen for this host.
+         */
+        public void sawMessage() {
+            sawMsg.set(true);
+        }
+
+        /**
+         * 
+         * @return {@code true} if a message was seen for this host, {@code false}
+         *         otherwise
+         */
+        public boolean messageSeen() {
+            return sawMsg.get();
+        }
+    }
+
+    /**
+     * Listener for the external topic. Simulates the actions taken by
+     * <i>AggregatedPolicyController.onTopicEvent</i>.
+     */
+    private static class MyExternalTopicListener implements Answer<Void> {
+
+        private final Context context;
+        private final Host host;
+
+        public MyExternalTopicListener(Context context, Host host) {
+            this.context = context;
+            this.host = host;
+        }
+
+        @Override
+        public Void answer(InvocationOnMock args) throws Throwable {
+            int i = 0;
+            CommInfrastructure commType = args.getArgument(i++);
+            String topic = args.getArgument(i++);
+            String event = args.getArgument(i++);
+
+            if (host.beforeOffer(commType, topic, event)) {
+                return null;
+            }
+
+            boolean result;
+            Object fact = context.decodeEvent(event);
+
+            if (fact == null) {
+                result = false;
+                context.bumpDecodeErrors();
+
+            } else {
+                result = true;
+
+                if (!host.beforeInsert(fact)) {
+                    // feature did not handle it so we handle it here
+                    host.afterInsert(fact, result);
+
+                    host.sawMessage();
+                    context.addEvent();
+                }
+            }
+
+            host.afterOffer(commType, topic, event, result);
+            return null;
+        }
+    }
+
+    /**
+     * Simulator for the feature-level factory.
+     */
+    private static class FeatureFactory extends PoolingFeature.Factory {
+
+        private final Context context;
+
+        /**
+         * 
+         * @param context
+         */
+        public FeatureFactory(Context context) {
+            this.context = context;
+
+            /*
+             * Note: do NOT extract anything from "context" at this point, because it
+             * hasn't been fully initialized yet
+             */
+        }
+
+        @Override
+        public Properties getProperties(String featName) {
+            Properties props = new Properties();
+
+            props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
+
+            props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+            props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+            props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+                            "" + STD_OFFLINE_PUB_WAIT_MS);
+            props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+                            "" + STD_START_HEARTBEAT_MS);
+            props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
+            props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
+            props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+                            "" + STD_ACTIVE_HEARTBEAT_MS);
+            props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+                            "" + STD_INTER_HEARTBEAT_MS);
+
+            props.putAll(makeSinkProperties(INTERNAL_TOPIC));
+            props.putAll(makeSourceProperties(INTERNAL_TOPIC));
+
+            return props;
+        }
+
+        @Override
+        public PolicyController getController(DroolsController droolsController) {
+            return context.getController(droolsController);
+        }
+    }
+
+    /**
+     * Simulator for the pooling manager factory.
+     */
+    private static class ManagerFactory extends PoolingManagerImpl.Factory {
+
+        /**
+         * Used to decode events from the external topic.
+         */
+        private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
+            @Override
+            protected ObjectMapper initialValue() {
+                return new ObjectMapper();
+            }
+        };
+
+        /**
+         * Used to decode events into a Map.
+         */
+        private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
+
+        /**
+         * 
+         * @param context
+         */
+        public ManagerFactory(Context context) {
+
+            /*
+             * Note: do NOT extract anything from "context" at this point, because it
+             * hasn't been fully initialized yet
+             */
+        }
+
+        @Override
+        public boolean canDecodeEvent(DroolsController drools, String topic) {
+            return true;
+        }
+
+        @Override
+        public Object decodeEvent(DroolsController drools, String topic, String event) {
+            try {
+                return mapper.get().readValue(event, typeRef);
+
+            } catch (IOException e) {
+                logger.warn("cannot decode external event", e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Controller that also implements the {@link TopicListener} interface.
+     */
+    private static interface ListenerController extends PolicyController, TopicListener {
+
+    }
+}
index 7782e47..9ee2d97 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.drools.pooling;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
@@ -122,8 +123,8 @@ public class PoolingFeatureTest {
         when(factory.getController(drools2)).thenReturn(controller2);
         when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled);
 
-        when(factory.makeManager(any(), any())).thenAnswer(args -> {
-            PoolingProperties props = args.getArgument(1);
+        when(factory.makeManager(any(), any(), any())).thenAnswer(args -> {
+            PoolingProperties props = args.getArgument(2);
 
             PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
 
@@ -148,6 +149,19 @@ public class PoolingFeatureTest {
         assertEquals(2, managers.size());
     }
 
+    @Test
+    public void testGetHost() {
+        String host = pool.getHost();
+        assertNotNull(host);
+
+        // create another and ensure it generates another host name
+        pool = new PoolingFeature();
+        String host2 = pool.getHost();
+        assertNotNull(host2);
+
+        assertTrue(!host.equals(host2));
+    }
+
     @Test
     public void testGetSequenceNumber() {
         assertEquals(0, pool.getSequenceNumber());
index e32fa54..693cb6d 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -73,6 +74,7 @@ public class PoolingManagerImplTest {
     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
     protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
 
+    private static final String MY_HOST = "my.host";
     private static final String HOST2 = "other.host";
 
     private static final String MY_CONTROLLER = "my.controller";
@@ -151,7 +153,7 @@ public class PoolingManagerImplTest {
 
         when(factory.makeEventQueue(any())).thenReturn(eventQueue);
         when(factory.makeClassExtractors(any())).thenReturn(extractors);
-        when(factory.makeDmaapManager(any())).thenReturn(dmaap);
+        when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap);
         when(factory.makeScheduler()).thenReturn(sched);
         when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
         when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
@@ -179,12 +181,12 @@ public class PoolingManagerImplTest {
 
         PoolingManagerImpl.setFactory(factory);
 
-        mgr = new PoolingManagerImpl(controller, poolProps);
+        mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps);
     }
 
     @Test
-    public void testPoolingManagerImpl() {
-        mgr = new PoolingManagerImpl(controller, poolProps);
+    public void testPoolingManagerImpl() throws Exception {
+        verify(factory).makeDmaapManager(any(), any());
 
         State st = mgr.getCurrent();
         assertTrue(st instanceof IdleState);
@@ -202,7 +204,7 @@ public class PoolingManagerImplTest {
         PolicyController ctlr = mock(PolicyController.class);
 
         PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
-                        xxx -> new PoolingManagerImpl(ctlr, poolProps));
+                        () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps));
         assertNotNull(ex.getCause());
         assertTrue(ex.getCause() instanceof ClassCastException);
     }
@@ -211,23 +213,28 @@ public class PoolingManagerImplTest {
     public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
         // throw an exception when we try to create the dmaap manager
         PoolingFeatureException ex = new PoolingFeatureException();
-        when(factory.makeDmaapManager(any())).thenThrow(ex);
+        when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
 
         PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
-                        xxx -> new PoolingManagerImpl(controller, poolProps));
+                        () -> new PoolingManagerImpl(MY_HOST, controller, poolProps));
         assertEquals(ex, ex2.getCause());
     }
 
     @Test
-    public void testGetHost() {
-        String host = mgr.getHost();
-        assertNotNull(host);
+    public void testGetCurrent() throws Exception {
+        assertEquals(IdleState.class, mgr.getCurrent().getClass());
+
+        startMgr();
+
+        assertEquals(StartState.class, mgr.getCurrent().getClass());
+    }
 
-        // create another manager and ensure it generates a different host
-        mgr = new PoolingManagerImpl(controller, poolProps);
+    @Test
+    public void testGetHost() {
+        assertEquals(MY_HOST, mgr.getHost());
 
-        assertNotNull(mgr.getHost());
-        assertFalse(host.equals(mgr.getHost()));
+        mgr = new PoolingManagerImpl(HOST2, controller, poolProps);
+        assertEquals(HOST2, mgr.getHost());
     }
 
     @Test
@@ -268,7 +275,7 @@ public class PoolingManagerImplTest {
         PoolingFeatureException ex = new PoolingFeatureException();
         doThrow(ex).when(dmaap).startPublisher();
 
-        PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, xxx -> mgr.beforeStart());
+        PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart());
         assertEquals(ex, ex2);
 
         // should never start the scheduler
@@ -453,8 +460,9 @@ public class PoolingManagerImplTest {
         // should have set the new filter
         verify(dmaap, times(++ntimes)).setFilter(any());
 
-        // should have cancelled the timer
-        assertEquals(1, futures.size());
+        // should have cancelled the timers
+        assertEquals(2, futures.size());
+        verify(futures.poll()).cancel(false);
         verify(futures.poll()).cancel(false);
 
         /*
@@ -465,8 +473,9 @@ public class PoolingManagerImplTest {
         // should have set the new filter
         verify(dmaap, times(++ntimes)).setFilter(any());
 
-        // timer should still be active
-        assertEquals(1, futures.size());
+        // new timers should now be active
+        assertEquals(2, futures.size());
+        verify(futures.poll(), never()).cancel(false);
         verify(futures.poll(), never()).cancel(false);
     }
 
@@ -548,7 +557,7 @@ public class PoolingManagerImplTest {
         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
 
-        verify(sched).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
+        verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
                         unitCap.capture());
 
         assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
@@ -987,7 +996,7 @@ public class PoolingManagerImplTest {
 
         // route the message to this host
         mgr.startDistributing(makeAssignments(true));
-        
+
         // generate RuntimeException when onTopicEvent() is invoked
         doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
 
@@ -1056,21 +1065,27 @@ public class PoolingManagerImplTest {
         startMgr();
 
         // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
+        assertNotNull(mgr.startDistributing(makeAssignments(true)));
         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue, never()).add(any());
 
 
-        // null assignments should be ignored
-        mgr.startDistributing(null);
+        // null assignments should cause message to be queued
+        assertNull(mgr.startDistributing(null));
+        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue).add(any());
+
 
+        // route the message to this host
+        assertNotNull(mgr.startDistributing(makeAssignments(true)));
         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue).add(any());
 
 
         // route the message to the other host
-        mgr.startDistributing(makeAssignments(false));
-
+        assertNotNull(mgr.startDistributing(makeAssignments(false)));
         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue).add(any());
     }
 
     @Test
@@ -1086,7 +1101,9 @@ public class PoolingManagerImplTest {
         when(eventQueue.poll()).thenAnswer(args -> lst.poll());
 
         // route the messages to this host
-        assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS));
+        CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
+        assertNotNull(latch);
+        assertTrue(latch.await(2, TimeUnit.SECONDS));
 
         // all of the events should have been processed locally
         verify(dmaap, times(START_PUB)).publish(any());
@@ -1330,7 +1347,7 @@ public class PoolingManagerImplTest {
      */
     private <T extends Exception> T expectException(Class<T> exClass, ExFunction<T> func) {
         try {
-            func.apply(null);
+            func.apply();
             throw new AssertionError("missing exception");
 
         } catch (Exception e) {
@@ -1349,10 +1366,9 @@ public class PoolingManagerImplTest {
         /**
          * Invokes the function.
          * 
-         * @param arg always {@code null}
          * @throws T if an error occurs
          */
-        public void apply(Void arg) throws T;
+        public void apply() throws T;
 
     }
 }
index 2d734c1..459c770 100644 (file)
@@ -48,13 +48,13 @@ public class PoolingPropertiesTest {
     public static final boolean STD_FEATURE_ENABLED = true;
     public static final int STD_OFFLINE_LIMIT = 10;
     public static final long STD_OFFLINE_AGE_MS = 1000L;
-    public static final long STD_START_HEARTBEAT_MS = 2000L;
-    public static final long STD_REACTIVATE_MS = 3000L;
-    public static final long STD_IDENTIFICATION_MS = 4000L;
-    public static final long STD_LEADER_MS = 5000L;
-    public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L;
-    public static final long STD_INTER_HEARTBEAT_MS = 7000L;
-    public static final long STD_OFFLINE_PUB_WAIT_MS = 8000L;
+    public static final long STD_OFFLINE_PUB_WAIT_MS = 2000L;
+    public static final long STD_START_HEARTBEAT_MS = 3000L;
+    public static final long STD_REACTIVATE_MS = 4000L;
+    public static final long STD_IDENTIFICATION_MS = 5000L;
+    public static final long STD_LEADER_MS = 6000L;
+    public static final long STD_ACTIVE_HEARTBEAT_MS = 7000L;
+    public static final long STD_INTER_HEARTBEAT_MS = 8000L;
 
     private Properties plain;
     private PoolingProperties pooling;
@@ -98,9 +98,14 @@ public class PoolingPropertiesTest {
         doTest(OFFLINE_AGE_MS, STD_OFFLINE_AGE_MS, 60000L, xxx -> pooling.getOfflineAgeMs());
     }
 
+    @Test
+    public void testGetOfflinePubWaitMs() throws PropertyException {
+        doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
+    }
+
     @Test
     public void testGetStartHeartbeatMs() throws PropertyException {
-        doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 50000L, xxx -> pooling.getStartHeartbeatMs());
+        doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 100000L, xxx -> pooling.getStartHeartbeatMs());
     }
 
     @Test
@@ -123,11 +128,6 @@ public class PoolingPropertiesTest {
         doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs());
     }
 
-    @Test
-    public void testGetOfflinePubWaitMs() throws PropertyException {
-        doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
-    }
-
     /**
      * Tests a particular property. Verifies that the correct value is returned if the
      * specialized property has a value or the property has no value. Also verifies that
@@ -174,12 +174,12 @@ public class PoolingPropertiesTest {
         props.setProperty(specialize(FEATURE_ENABLED, CONTROLLER), "" + STD_FEATURE_ENABLED);
         props.setProperty(specialize(OFFLINE_LIMIT, CONTROLLER), "" + STD_OFFLINE_LIMIT);
         props.setProperty(specialize(OFFLINE_AGE_MS, CONTROLLER), "" + STD_OFFLINE_AGE_MS);
+        props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS);
         props.setProperty(specialize(START_HEARTBEAT_MS, CONTROLLER), "" + STD_START_HEARTBEAT_MS);
         props.setProperty(specialize(REACTIVATE_MS, CONTROLLER), "" + STD_REACTIVATE_MS);
         props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS);
         props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS);
         props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS);
-        props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS);
 
         return props;
     }
index 505dc40..d09650d 100644 (file)
@@ -122,6 +122,24 @@ public class SpecPropertiesTest {
         assertNull(props.getProperty(gen(PROP_UNKNOWN)));
     }
 
+    @Test
+    public void testSpecPropertiesStringStringProperties_EmptyPrefix() {
+        supportingProps = new Properties();
+
+        supportingProps.setProperty(PROP_NO_PREFIX, VAL_NO_PREFIX);
+        supportingProps.setProperty("a.value", VAL_GEN);
+        supportingProps.setProperty("b.value", VAL_GEN);
+        supportingProps.setProperty(MY_SPEC + ".b.value", VAL_SPEC);
+
+        // no supporting properties
+        props = new SpecProperties("", MY_SPEC, supportingProps);
+
+        assertEquals(VAL_NO_PREFIX, props.getProperty(gen(PROP_NO_PREFIX)));
+        assertEquals(VAL_GEN, props.getProperty(gen("a.value")));
+        assertEquals(VAL_SPEC, props.getProperty(MY_SPEC + ".b.value"));
+        assertNull(props.getProperty(gen(PROP_UNKNOWN)));
+    }
+
     @Test
     public void testWithTrailingDot() {
         // neither has trailing dot
@@ -132,6 +150,16 @@ public class SpecPropertiesTest {
         props = new SpecProperties(PREFIX_GEN, MY_SPEC + ".");
         assertEquals(PREFIX_GEN, props.getPrefix());
         assertEquals(PREFIX_SPEC, props.getSpecPrefix());
+
+        // first is empty
+        props = new SpecProperties("", MY_SPEC);
+        assertEquals("", props.getPrefix());
+        assertEquals(MY_SPEC + ".", props.getSpecPrefix());
+
+        // second is empty
+        props = new SpecProperties(PREFIX_GEN, "");
+        assertEquals(PREFIX_GEN, props.getPrefix());
+        assertEquals(PREFIX_GEN, props.getSpecPrefix());
     }
 
     @Test
index 7b4b060..27284dc 100644 (file)
@@ -298,18 +298,18 @@ public class ActiveStateTest extends BasicStateTester {
 
         // heart beat generator
         timer = repeatedTasks.remove();
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
 
         // my heart beat checker
         timer = repeatedTasks.remove();
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
 
         // predecessor's heart beat checker
         timer = repeatedTasks.remove();
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
     }
 
     @Test
@@ -327,13 +327,13 @@ public class ActiveStateTest extends BasicStateTester {
 
         // heart beat generator
         timer = repeatedTasks.remove();
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
 
         // my heart beat checker
         timer = repeatedTasks.remove();
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
     }
 
     @Test
@@ -389,13 +389,13 @@ public class ActiveStateTest extends BasicStateTester {
 
         // set up next state
         State next = mock(State.class);
-        when(mgr.goInactive()).thenReturn(next);
+        when(mgr.goStart()).thenReturn(next);
 
         // fire the task - should transition
         assertEquals(next, task.third().fire());
 
-        // should indicate failure
-        verify(mgr).internalTopicFailed();
+        // should stop distributing
+        verify(mgr).startDistributing(null);
 
         // should publish an offline message
         Offline msg = captureAdminMessage(Offline.class);
index 394adae..ae53ce0 100644 (file)
@@ -22,12 +22,19 @@ package org.onap.policy.drools.pooling.state;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
 import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
 import org.onap.policy.drools.utils.Pair;
 
 public class InactiveStateTest extends BasicStateTester {
@@ -52,6 +59,42 @@ public class InactiveStateTest extends BasicStateTester {
         utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
     }
 
+    @Test
+    public void testProcessLeader() {
+        State next = mock(State.class);
+        when(mgr.goActive()).thenReturn(next);
+
+        String[] arr = {PREV_HOST, MY_HOST, HOST1};
+        BucketAssignments asgn = new BucketAssignments(arr);
+        Leader msg = new Leader(PREV_HOST, asgn);
+
+        assertEquals(next, state.process(msg));
+        verify(mgr).startDistributing(asgn);
+    }
+
+    @Test
+    public void testProcessLeader_Invalid() {
+        Leader msg = new Leader(PREV_HOST, null);
+
+        // should stay in the same state, and not start distributing
+        assertNull(state.process(msg));
+        verify(mgr, never()).startDistributing(any());
+        verify(mgr, never()).goActive();
+        verify(mgr, never()).goInactive();
+    }
+
+    @Test
+    public void testProcessQuery() {
+        State next = mock(State.class);
+        when(mgr.goQuery()).thenReturn(next);
+
+        assertEquals(next, state.process(new Query()));
+
+        Identification ident = captureAdminMessage(Identification.class);
+        assertEquals(MY_HOST, ident.getSource());
+        assertEquals(ASGN3, ident.getAssignments());
+    }
+
     @Test
     public void testGoInatcive() {
         assertNull(state.goInactive());
index e171841..7ac5843 100644 (file)
@@ -64,39 +64,6 @@ public class ProcessingStateTest extends BasicStateTester {
         utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
     }
 
-    @Test
-    public void testGoActive_WithAssignment() {
-        State act = mock(State.class);
-        State inact = mock(State.class);
-
-        when(mgr.goActive()).thenReturn(act);
-        when(mgr.goInactive()).thenReturn(inact);
-
-        String[] arr = {HOST2, PREV_HOST, MY_HOST};
-        BucketAssignments asgn = new BucketAssignments(arr);
-
-        assertEquals(act, state.goActive(asgn));
-
-        verify(mgr).startDistributing(asgn);
-    }
-
-    @Test
-    public void testGoActive_WithoutAssignment() {
-        State act = mock(State.class);
-        State inact = mock(State.class);
-
-        when(mgr.goActive()).thenReturn(act);
-        when(mgr.goInactive()).thenReturn(inact);
-
-        String[] arr = {HOST2, PREV_HOST};
-        BucketAssignments asgn = new BucketAssignments(arr);
-
-        assertEquals(inact, state.goActive(asgn));
-
-        verify(mgr).startDistributing(asgn);
-
-    }
-
     @Test
     public void testProcessQuery() {
         State next = mock(State.class);
@@ -154,13 +121,6 @@ public class ProcessingStateTest extends BasicStateTester {
         state = new ProcessingState(mgr, LEADER);
     }
 
-    @Test
-    public void testMakeIdentification() {
-        Identification ident = state.makeIdentification();
-        assertEquals(MY_HOST, ident.getSource());
-        assertEquals(ASGN3, ident.getAssignments());
-    }
-
     @Test
     public void testGetAssignments() {
         // assignments from constructor
index a7c3a3d..80778ed 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.policy.drools.pooling.state;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -254,10 +253,13 @@ public class QueryStateTest extends BasicStateTester {
         // should published an Offline message and go inactive
 
         State next = mock(State.class);
-        when(mgr.goInactive()).thenReturn(next);
+        when(mgr.goStart()).thenReturn(next);
 
         assertEquals(next, timer.second().fire());
 
+        // should stop distributing
+        verify(mgr).startDistributing(null);
+
         Offline msg = captureAdminMessage(Offline.class);
         assertEquals(MY_HOST, msg.getSource());
     }
@@ -342,21 +344,6 @@ public class QueryStateTest extends BasicStateTester {
         assertTrue(admin.isEmpty());
     }
 
-    @Test
-    public void testHasAssignment() {
-        // null assignment
-        mgr.startDistributing(null);
-        assertFalse(state.hasAssignment());
-
-        // not in assignments
-        state.setAssignments(new BucketAssignments(new String[] {HOST3}));
-        assertFalse(state.hasAssignment());
-
-        // it IS in the assignments
-        state.setAssignments(new BucketAssignments(new String[] {MY_HOST}));
-        assertTrue(state.hasAssignment());
-    }
-
     @Test
     public void testRecordInfo_NullSource() {
         state.setAssignments(ASGN3);
index af4e8f1..ee4c1ad 100644 (file)
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import java.util.Map;
@@ -39,6 +40,7 @@ import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 import org.onap.policy.drools.pooling.message.Query;
 import org.onap.policy.drools.utils.Pair;
+import org.onap.policy.drools.utils.Triple;
 
 public class StartStateTest extends BasicStateTester {
 
@@ -78,15 +80,36 @@ public class StartStateTest extends BasicStateTester {
         assertEquals(MY_HOST, msg.first());
         assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs());
 
-        Pair<Long, StateTimerTask> timer = onceTasks.removeFirst();
 
-        assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first().longValue());
+        /*
+         * Verify heartbeat generator
+         */
+        Triple<Long, Long, StateTimerTask> generator = repeatedTasks.removeFirst();
+
+        assertEquals(STD_INTER_HEARTBEAT_MS, generator.first().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, generator.second().longValue());
+
+        // invoke the task - it should generate another heartbeat
+        assertEquals(null, generator.third().fire());
+        verify(mgr, times(2)).publish(MY_HOST, msg.second());
+
+        // and again
+        assertEquals(null, generator.third().fire());
+        verify(mgr, times(3)).publish(MY_HOST, msg.second());
+
+
+        /*
+         * Verify heartbeat checker
+         */
+        Pair<Long, StateTimerTask> checker = onceTasks.removeFirst();
+
+        assertEquals(STD_HEARTBEAT_WAIT_MS, checker.first().longValue());
 
         // invoke the task - it should go to the state returned by the mgr
         State next = mock(State.class);
         when(mgr.goInactive()).thenReturn(next);
 
-        assertEquals(next, timer.second().fire());
+        assertEquals(next, checker.second().fire());
 
         verify(mgr).internalTopicFailed();
     }
index a184dfa..08b55c6 100644 (file)
@@ -154,12 +154,35 @@ public class StateTest extends BasicStateTester {
     }
 
     @Test
-    public void testGoActive() {
-        State next = mock(State.class);
-        when(mgr.goActive()).thenReturn(next);
+    public void testGoActive_WithAssignment() {
+        State act = mock(State.class);
+        State inact = mock(State.class);
 
-        State next2 = state.goActive();
-        assertEquals(next, next2);
+        when(mgr.goActive()).thenReturn(act);
+        when(mgr.goInactive()).thenReturn(inact);
+
+        String[] arr = {HOST2, PREV_HOST, MY_HOST};
+        BucketAssignments asgn = new BucketAssignments(arr);
+
+        assertEquals(act, state.goActive(asgn));
+
+        verify(mgr).startDistributing(asgn);
+    }
+
+    @Test
+    public void testGoActive_WithoutAssignment() {
+        State act = mock(State.class);
+        State inact = mock(State.class);
+
+        when(mgr.goActive()).thenReturn(act);
+        when(mgr.goInactive()).thenReturn(inact);
+
+        String[] arr = {HOST2, PREV_HOST};
+        BucketAssignments asgn = new BucketAssignments(arr);
+
+        assertEquals(inact, state.goActive(asgn));
+
+        verify(mgr).startDistributing(asgn);
     }
 
     @Test
@@ -374,6 +397,20 @@ public class StateTest extends BasicStateTester {
         verify(sched).cancel();
     }
 
+    @Test
+    public void testMissedHeartbeat() {
+        State next = mock(State.class);
+        when(mgr.goStart()).thenReturn(next);
+
+        State next2 = state.missedHeartbeat();
+        assertEquals(next, next2);
+
+        verify(mgr).startDistributing(null);
+
+        Offline msg = captureAdminMessage(Offline.class);
+        assertEquals(MY_HOST, msg.getSource());
+    }
+
     @Test
     public void testInternalTopicFailed() {
         State next = mock(State.class);
@@ -397,6 +434,13 @@ public class StateTest extends BasicStateTester {
         assertEquals(timestamp, msg.getTimestampMs());
     }
 
+    @Test
+    public void testMakeIdentification() {
+        Identification ident = state.makeIdentification();
+        assertEquals(MY_HOST, ident.getSource());
+        assertEquals(ASGN3, ident.getAssignments());
+    }
+
     @Test
     public void testMakeOffline() {
         Offline msg = state.makeOffline();