Merge "Close old UEB/DMaaP consumer"
authorJorge Hernandez <jh1730@att.com>
Mon, 23 Apr 2018 15:11:38 +0000 (15:11 +0000)
committerGerrit Code Review <gerrit@onap.org>
Mon, 23 Apr 2018 15:11:38 +0000 (15:11 +0000)
39 files changed:
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingProperties.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java [new file with mode: 0644]
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingPropertiesTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SpecPropertiesTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java
policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java
policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java
policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java
policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java

index 102eda7..abb4da3 100644 (file)
@@ -202,6 +202,7 @@ public class DmaapManager {
 
         } catch (InterruptedException e) {
             logger.warn("message transmission stopped due to {}", e.getMessage());
+            Thread.currentThread().interrupt();
         }
 
         try {
index 21cbc4d..1e2071a 100644 (file)
@@ -21,7 +21,9 @@
 package org.onap.policy.drools.pooling;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import org.onap.policy.common.utils.properties.exception.PropertyException;
 import org.onap.policy.drools.controller.DroolsController;
@@ -53,6 +55,11 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
      */
     private static Factory factory;
 
+    /**
+     * ID of this host.
+     */
+    private final String host;
+
     /**
      * Entire set of feature properties, including those specific to various controllers.
      */
@@ -61,7 +68,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
     /**
      * Maps a controller name to its associated manager.
      */
-    private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+    private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+    /**
+     * Decremented each time a manager enters the Active state. Used by junit tests.
+     */
+    private final CountDownLatch activeLatch = new CountDownLatch(1);
 
     /**
      * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
@@ -75,6 +87,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
      */
     public PoolingFeature() {
         super();
+
+        this.host = UUID.randomUUID().toString();
     }
 
     protected static Factory getFactory() {
@@ -90,6 +104,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
         PoolingFeature.factory = factory;
     }
 
+    public String getHost() {
+        return host;
+    }
+
+    /**
+     * @return a latch that will be decremented when a manager enters the active state
+     */
+    protected CountDownLatch getActiveLatch() {
+        return activeLatch;
+    }
+
     @Override
     public int getSequenceNumber() {
         return 0;
@@ -123,7 +148,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
                 PoolingProperties props = new PoolingProperties(name, featProps);
 
                 logger.info("pooling enabled for {}", name);
-                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(controller, props));
+                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch));
 
             } catch (PropertyException e) {
                 logger.error("pooling disabled due to exception for {}", name, e);
@@ -371,12 +396,15 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
         /**
          * Makes a pooling manager for a controller.
          * 
+         * @param host name/uuid of this host
          * @param controller
          * @param props properties to use to configure the manager
+         * @param activeLatch decremented when the manager goes Active
          * @return a new pooling manager
          */
-        public PoolingManagerImpl makeManager(PolicyController controller, PoolingProperties props) {
-            return new PoolingManagerImpl(controller, props);
+        public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+                        CountDownLatch activeLatch) {
+            return new PoolingManagerImpl(host, controller, props, activeLatch);
         }
 
         /**
index 422efdd..de25e47 100644 (file)
@@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import org.onap.policy.drools.controller.DroolsController;
 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.drools.event.comm.TopicListener;
@@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState;
 import org.onap.policy.drools.pooling.state.StartState;
 import org.onap.policy.drools.pooling.state.State;
 import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.properties.PolicyProperties;
 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
 import org.onap.policy.drools.system.PolicyController;
 import org.slf4j.Logger;
@@ -74,11 +73,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
      */
     private static Factory factory = new Factory();
 
-    /**
-     * ID of the last host that was created.
-     */
-    private static final AtomicReference<String> lastHost = new AtomicReference<>(null);
-
     /**
      * ID of this host.
      */
@@ -99,6 +93,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
      */
     private final TopicListener listener;
 
+    /**
+     * Decremented each time the manager enters the Active state. Used by junit tests.
+     */
+    private final CountDownLatch activeLatch;
+
     /**
      * Used to encode & decode request objects received from & sent to a rule engine.
      */
@@ -148,7 +147,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
     /**
      * Queue used when no bucket assignments are available.
      */
-    private EventQueue eventq;
+    private final EventQueue eventq;
 
     /**
      * {@code True} if events offered by the controller should be intercepted,
@@ -158,28 +157,28 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
 
     /**
      * Constructs the manager, initializing all of the data structures.
-     * 
+     *
+     * @param host name/uuid of this host
      * @param controller controller with which this is associated
      * @param props feature properties specific to the controller
+     * @param activeLatch latch to be decremented each time the manager enters the Active
+     *        state
      */
-    public PoolingManagerImpl(PolicyController controller, PoolingProperties props) {
-        this.host = UUID.randomUUID().toString();
+    public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
+                    CountDownLatch activeLatch) {
+        this.host = host;
         this.controller = controller;
         this.props = props;
-
-        lastHost.set(this.host);
+        this.activeLatch = activeLatch;
 
         try {
             this.listener = (TopicListener) controller;
             this.serializer = new Serializer();
             this.topic = props.getPoolingTopic();
             this.eventq = factory.makeEventQueue(props);
-
-            SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(),
-                            props.getSource());
-            this.extractors = factory.makeClassExtractors(spec);
-
-            this.dmaapMgr = factory.makeDmaapManager(props);
+            this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
+            this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
+                            makeDmaapProps(controller, props.getSource()));
             this.current = new IdleState(this);
 
             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -202,16 +201,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         PoolingManagerImpl.factory = factory;
     }
 
-    /**
-     * Used by junit tests.
-     * 
-     * @return the ID of the last host that was created, or {@code null} if no hosts have
-     *         been created yet
-     */
-    protected static String getLastHost() {
-        return lastHost.get();
-    }
-
     /**
      * Should only be used by junit tests.
      * 
@@ -236,6 +225,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         return props;
     }
 
+    /**
+     * Makes properties for configuring extractors.
+     * 
+     * @param controller the controller for which the extractors will be configured
+     * @param source properties from which to get the extractor properties
+     * @return extractor properties
+     */
+    private Properties makeExtractorProps(PolicyController controller, Properties source) {
+        return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
+    }
+
+    /**
+     * Makes properties for configuring DMaaP. Copies properties from the source that
+     * start with the Pooling property prefix followed by the controller name, stripping
+     * the prefix and controller name.
+     * 
+     * @param controller the controller for which DMaaP will be configured
+     * @param source properties from which to get the DMaaP properties
+     * @return DMaaP properties
+     */
+    private Properties makeDmaapProps(PolicyController controller, Properties source) {
+        SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
+
+        // could be UEB or DMAAP, so add both
+        addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+        addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+
+        return specProps;
+    }
+
+    /**
+     * Adds DMaaP consumer properties, consumer group & instance. The group is the host
+     * and the instance is a constant.
+     * 
+     * @param props where to add the new properties
+     * @param prefix property prefix
+     */
+    private void addDmaapConsumerProps(SpecProperties props, String prefix) {
+        String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
+
+        props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
+        props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
+    }
+
     /**
      * Indicates that the controller is about to start. Starts the publisher for the
      * internal topic, and creates a thread pool for the timers.
@@ -288,8 +321,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
             scheduler = null;
 
             if (!(current instanceof IdleState)) {
-                dmaapMgr.stopConsumer(this);
                 changeState(new IdleState(this));
+                dmaapMgr.stopConsumer(this);
                 publishAdmin(new Offline(getHost()));
             }
         }
@@ -405,12 +438,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
 
         // wrap the future in a "CancellableScheduledTask"
-        return new CancellableScheduledTask() {
-            @Override
-            public void cancel() {
-                fut.cancel(false);
-            }
-        };
+        return () -> fut.cancel(false);
     }
 
     @Override
@@ -420,12 +448,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
                         TimeUnit.MILLISECONDS);
 
         // wrap the future in a "CancellableScheduledTask"
-        return new CancellableScheduledTask() {
-            @Override
-            public void cancel() {
-                fut.cancel(false);
-            }
-        };
+        return () -> fut.cancel(false);
     }
 
     @Override
@@ -609,7 +632,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
                             topic);
 
         } else {
-            logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
+            logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
             event.bumpNumHops();
             publish(target, event);
         }
@@ -751,26 +774,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
 
     @Override
     public CountDownLatch startDistributing(BucketAssignments asgn) {
-        if (asgn == null) {
-            return null;
-        }
-
-        logger.info("new assignments for topic {}", getTopic());
-
         synchronized (curLocker) {
+            int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
+            logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
             assignments = asgn;
         }
 
+        if (asgn == null) {
+            return null;
+        }
+
         /*
          * publish the events from the event queue, but do it in a background thread so
          * that the state machine can enter its correct state BEFORE we start processing
          * the events
          */
         CountDownLatch latch = new CountDownLatch(1);
-        
+
         new Thread(() -> {
             synchronized (curLocker) {
                 if (assignments == null) {
+                    latch.countDown();
                     return;
                 }
 
@@ -779,11 +803,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
                 while ((ev = eventq.poll()) != null) {
                     handle(ev);
                 }
-                
+
                 latch.countDown();
             }
         }).start();
-        
+
         return latch;
     }
 
@@ -804,6 +828,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
 
     @Override
     public State goActive() {
+        activeLatch.countDown();
         return new ActiveState(this);
     }
 
@@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         /**
          * Creates a DMaaP manager.
          * 
-         * @param props properties used to configure the manager
+         * @param topic name of the internal DMaaP topic
+         * @param props properties used to configure DMaaP
          * @return a new DMaaP manager
          * @throws PoolingFeatureException if an error occurs
          */
-        public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException {
-            return new DmaapManager(props.getPoolingTopic(), props.getSource());
+        public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
+            return new DmaapManager(topic, props);
         }
 
         /**
index 4e8de0d..5431942 100644 (file)
@@ -51,12 +51,12 @@ public class PoolingProperties extends SpecPropertyConfiguration {
     public static final String FEATURE_ENABLED = PREFIX + "{?.}enabled";
     public static final String OFFLINE_LIMIT = PREFIX + "{?.}offline.queue.limit";
     public static final String OFFLINE_AGE_MS = PREFIX + "{?.}offline.queue.age.milliseconds";
+    public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
     public static final String START_HEARTBEAT_MS = PREFIX + "{?.}start.heartbeat.milliseconds";
     public static final String REACTIVATE_MS = PREFIX + "{?.}reactivate.milliseconds";
     public static final String IDENTIFICATION_MS = PREFIX + "{?.}identification.milliseconds";
     public static final String ACTIVE_HEARTBEAT_MS = PREFIX + "{?.}active.heartbeat.milliseconds";
     public static final String INTER_HEARTBEAT_MS = PREFIX + "{?.}inter.heartbeat.milliseconds";
-    public static final String OFFLINE_PUB_WAIT_MS = PREFIX + "{?.}offline.publish.wait.milliseconds";
 
     /**
      * Type of item that the extractors will be extracting.
@@ -93,11 +93,18 @@ public class PoolingProperties extends SpecPropertyConfiguration {
     @Property(name = OFFLINE_AGE_MS, defaultValue = "60000")
     private long offlineAgeMs;
 
+    /**
+     * Time, in milliseconds, to wait for an "Offline" message to be published
+     * to DMaaP.
+     */
+    @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
+    private long offlinePubWaitMs;
+
     /**
      * Time, in milliseconds, to wait for this host's heart beat during the
      * start-up state.
      */
-    @Property(name = START_HEARTBEAT_MS, defaultValue = "50000")
+    @Property(name = START_HEARTBEAT_MS, defaultValue = "100000")
     private long startHeartbeatMs;
 
     /**
@@ -123,18 +130,11 @@ public class PoolingProperties extends SpecPropertyConfiguration {
 
     /**
      * Time, in milliseconds, to wait between heart beat generations during
-     * the active state.
+     * the active and start-up states.
      */
     @Property(name = INTER_HEARTBEAT_MS, defaultValue = "15000")
     private long interHeartbeatMs;
 
-    /**
-     * Time, in milliseconds, to wait for an "Offline" message to be published
-     * to DMaaP.
-     */
-    @Property(name = OFFLINE_PUB_WAIT_MS, defaultValue = "3000")
-    private long offlinePubWaitMs;
-
     /**
      * @param controllerName the name of the controller
      * @param props set of properties used to configure this
@@ -163,6 +163,10 @@ public class PoolingProperties extends SpecPropertyConfiguration {
         return offlineAgeMs;
     }
 
+    public long getOfflinePubWaitMs() {
+        return offlinePubWaitMs;
+    }
+
     public long getStartHeartbeatMs() {
         return startHeartbeatMs;
     }
@@ -182,8 +186,4 @@ public class PoolingProperties extends SpecPropertyConfiguration {
     public long getInterHeartbeatMs() {
         return interHeartbeatMs;
     }
-
-    public long getOfflinePubWaitMs() {
-        return offlinePubWaitMs;
-    }
 }
index 31b4e61..c831f70 100644 (file)
@@ -40,7 +40,8 @@ public class SpecProperties extends Properties {
 
     /**
      * 
-     * @param prefix the property name prefix that appears before any specialization
+     * @param prefix the property name prefix that appears before any specialization, may
+     *        be ""
      * @param specialization the property name specialization (e.g., session name)
      */
     public SpecProperties(String prefix, String specialization) {
@@ -50,7 +51,8 @@ public class SpecProperties extends Properties {
 
     /**
      * 
-     * @param prefix the property name prefix that appears before any specialization
+     * @param prefix the property name prefix that appears before any specialization, may
+     *        be ""
      * @param specialization the property name specialization (e.g., session name)
      * @param props the default properties
      */
@@ -68,7 +70,7 @@ public class SpecProperties extends Properties {
      * @return the text, with a trailing "."
      */
     private static String withTrailingDot(String text) {
-        return text.endsWith(".") ? text : text + ".";
+        return text.isEmpty() || text.endsWith(".") ? text : text + ".";
     }
 
     /**
@@ -105,11 +107,11 @@ public class SpecProperties extends Properties {
 
     @Override
     public final int hashCode() {
-        throw new UnsupportedOperationException("HostBucket cannot be hashed");
+        throw new UnsupportedOperationException("SpecProperties cannot be hashed");
     }
 
     @Override
     public final boolean equals(Object obj) {
-        throw new UnsupportedOperationException("cannot compare HostBuckets");
+        throw new UnsupportedOperationException("cannot compare SpecProperties");
     }
 }
index 2752ca8..ccca77b 100644 (file)
@@ -374,7 +374,7 @@ public class ClassExtractors {
 
             } catch (NoSuchMethodException expected) {
                 // no getXxx() method, maybe there's a field by this name
-                logger.debug("no method {} in {}", nm, clazz.getName());
+                logger.debug("no method {} in {}", nm, clazz.getName(), expected);
                 return null;
 
             } catch (SecurityException e) {
@@ -445,7 +445,7 @@ public class ClassExtractors {
 
             } catch (NoSuchFieldException expected) {
                 // no field by this name - try super class & interfaces
-                logger.debug("no field {} in {}", name, clazz.getName());
+                logger.debug("no field {} in {}", name, clazz.getName(), expected);
 
             } catch (SecurityException e) {
                 throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
index b0a36cd..8f0a902 100644 (file)
@@ -95,18 +95,19 @@ public class ActiveState extends ProcessingState {
         if ((succHost = assigned.higher(getHost())) == null) {
             // wrapped around - successor is the first host in the set
             succHost = assigned.first();
-            logger.info("this host's successor is {} on topic {}", succHost, getTopic());
         }
+        logger.info("this host's successor is {} on topic {}", succHost, getTopic());        
 
         if ((predHost = assigned.lower(getHost())) == null) {
             // wrapped around - predecessor is the last host in the set
             predHost = assigned.last();
-            logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
         }
+        logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
     }
 
     @Override
     public void start() {
+        super.start();
         addTimers();
         genHeartbeat();
     }
@@ -120,7 +121,7 @@ public class ActiveState extends ProcessingState {
         /*
          * heart beat generator
          */
-        long genMs = getProperties().getActiveHeartbeatMs();
+        long genMs = getProperties().getInterHeartbeatMs();
 
         scheduleWithFixedDelay(genMs, genMs, () -> {
             genHeartbeat();
@@ -130,9 +131,9 @@ public class ActiveState extends ProcessingState {
         /*
          * my heart beat checker
          */
-        long interMs = getProperties().getInterHeartbeatMs();
+        long waitMs = getProperties().getActiveHeartbeatMs();
 
-        scheduleWithFixedDelay(interMs, interMs, () -> {
+        scheduleWithFixedDelay(waitMs, waitMs, () -> {
             if (myHeartbeatSeen) {
                 myHeartbeatSeen = false;
                 return null;
@@ -141,7 +142,7 @@ public class ActiveState extends ProcessingState {
             // missed my heart beat
             logger.error("missed my heartbeat on topic {}", getTopic());
 
-            return internalTopicFailed();
+            return missedHeartbeat();
         });
 
         /*
@@ -149,7 +150,7 @@ public class ActiveState extends ProcessingState {
          */
         if (!predHost.isEmpty()) {
 
-            scheduleWithFixedDelay(interMs, interMs, () -> {
+            scheduleWithFixedDelay(waitMs, waitMs, () -> {
                 if (predHeartbeatSeen) {
                     predHeartbeatSeen = false;
                     return null;
index 6be2fb8..f717aa5 100644 (file)
@@ -44,24 +44,17 @@ public class InactiveState extends State {
 
     @Override
     public void start() {
-
         super.start();
-
-        schedule(getProperties().getReactivateMs(), () -> goStart());
+        schedule(getProperties().getReactivateMs(), this::goStart);
     }
 
     @Override
     public State process(Leader msg) {
-        if(isValid(msg)) {
+        if (isValid(msg)) {
             logger.info("received Leader message from {} on topic {}", msg.getSource(), getTopic());
-            startDistributing(msg.getAssignments());
-            
-            if(msg.getAssignments().hasAssignment(getHost())) {
-                logger.info("received Leader message on topic {}", getTopic());
-                return goActive();
-            }
+            return goActive(msg.getAssignments());
         }
-        
+
         return null;
     }
 
index 1e9bb58..e9dc032 100644 (file)
@@ -73,24 +73,6 @@ public class ProcessingState extends State {
         this.leader = leader;
     }
 
-    /**
-     * Goes active with a new set of assignments.
-     * 
-     * @param asgn new assignments
-     * @return the new state, either Active or Inactive, depending on whether or not this
-     *         host has an assignment
-     */
-    protected State goActive(BucketAssignments asgn) {
-        startDistributing(asgn);
-
-        if (asgn.hasAssignment(getHost())) {
-            return goActive();
-
-        } else {
-            return goInactive();
-        }
-    }
-
     /**
      * Generates an Identification message and goes to the query state.
      */
@@ -154,11 +136,11 @@ public class ProcessingState extends State {
         }
 
         Leader msg = makeLeader(alive);
-        publish(msg);
+        logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
 
-        setAssignments(msg.getAssignments());
+        publish(msg);
 
-        return goActive();
+        return goActive(msg.getAssignments());
     }
 
     /**
index 9045165..1a4da15 100644 (file)
@@ -63,7 +63,6 @@ public class QueryState extends ProcessingState {
 
     @Override
     public void start() {
-
         super.start();
 
         // start identification timer
@@ -85,39 +84,21 @@ public class QueryState extends ProcessingState {
             if (!sawSelfIdent) {
                 // didn't see our identification
                 logger.error("missed our own Ident message on topic {}", getTopic());
-                return internalTopicFailed();
+                return missedHeartbeat();
 
             } else if (isLeader()) {
                 // "this" host is the new leader
                 logger.info("this host is the new leader for topic {}", getTopic());
                 return becomeLeader(alive);
 
-            } else if (hasAssignment()) {
-                /*
-                 * this host is not the new leader, but it does have an assignment -
-                 * return to the active state while we wait for the leader
-                 */
-                logger.info("no new leader on topic {}", getTopic());
-                return goActive();
-
             } else {
-                // not the leader and no assignment yet
+                // not the leader - return to previous state
                 logger.info("no new leader on topic {}", getTopic());
-                return goInactive();
+                return goActive(getAssignments());
             }
         });
     }
 
-    /**
-     * Determines if this host has an assignment in the CURRENT assignments.
-     * 
-     * @return {@code true} if this host has an assignment, {@code false} otherwise
-     */
-    protected boolean hasAssignment() {
-        BucketAssignments asgn = getAssignments();
-        return (asgn != null && asgn.hasAssignment(getHost()));
-    }
-
     @Override
     public State goQuery() {
         return null;
index a978e24..3068cfc 100644 (file)
@@ -67,8 +67,22 @@ public class StartState extends State {
 
         super.start();
 
-        publish(getHost(), makeHeartbeat(hbTimestampMs));
+        Heartbeat hb = makeHeartbeat(hbTimestampMs);
+        publish(getHost(), hb);
 
+        /*
+         * heart beat generator
+         */
+        long genMs = getProperties().getInterHeartbeatMs();
+
+        scheduleWithFixedDelay(genMs, genMs, () -> {
+            publish(getHost(), hb);
+            return null;
+        });
+
+        /*
+         * my heart beat checker
+         */
         schedule(getProperties().getStartHeartbeatMs(), () -> {
             logger.error("missed heartbeat on topic {}", getTopic());
             return internalTopicFailed();
index 421b922..545c2ef 100644 (file)
@@ -113,12 +113,21 @@ public abstract class State {
     }
 
     /**
-     * Transitions to the "active" state.
+     * Goes active with a new set of assignments.
      * 
-     * @return the new state
+     * @param asgn new assignments
+     * @return the new state, either Active or Inactive, depending on whether or not this
+     *         host has an assignment
      */
-    public final State goActive() {
-        return mgr.goActive();
+    protected State goActive(BucketAssignments asgn) {
+        startDistributing(asgn);
+
+        if (asgn != null && asgn.hasAssignment(getHost())) {
+            return mgr.goActive();
+
+        } else {
+            return goInactive();
+        }
     }
 
     /**
@@ -322,7 +331,21 @@ public abstract class State {
     }
 
     /**
-     * Indicates that the internal topic failed.
+     * Indicates that we failed to see our own heartbeat; must be a problem with the
+     * internal topic.
+     * 
+     * @return a new {@link StartState}
+     */
+    protected final State missedHeartbeat() {
+        publish(makeOffline());
+        mgr.startDistributing(null);
+
+        return mgr.goStart();
+    }
+
+    /**
+     * Indicates that the internal topic failed; this should only be invoked from the
+     * StartState.
      * 
      * @return a new {@link InactiveState}
      */
index a91671f..a5688df 100644 (file)
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -211,8 +212,8 @@ public class DmaapManagerTest {
         // force exception when it starts
         doThrow(new IllegalStateException("expected")).when(sink).start();
 
-        expectException("startPublisher,start", xxx -> mgr.startPublisher());
-        expectException("startPublisher,publish", xxx -> mgr.publish(MSG));
+        expectException("startPublisher,start", () -> mgr.startPublisher());
+        expectException("startPublisher,publish", () -> mgr.publish(MSG));
 
         // allow it to succeed this time
         reset(sink);
@@ -264,11 +265,15 @@ public class DmaapManagerTest {
         long minms = 2000L;
 
         // tell the publisher to stop in minms + additional time
-        Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L));
+        CountDownLatch latch = new CountDownLatch(1);
+        Thread thread = new Thread(() -> {
+            latch.countDown();
+            mgr.stopPublisher(minms + 3000L);
+        });
         thread.start();
 
-        // give the thread a chance to start
-        Thread.sleep(50L);
+        // wait for the thread to start
+        latch.await();
 
         // interrupt it - it should immediately finish its work
         thread.interrupt();
@@ -336,7 +341,7 @@ public class DmaapManagerTest {
     @Test
     public void testPublish() throws PoolingFeatureException {
         // cannot publish before starting
-        expectException("publish,pre", xxx -> mgr.publish(MSG));
+        expectException("publish,pre", () -> mgr.publish(MSG));
 
         mgr.startPublisher();
 
@@ -352,7 +357,7 @@ public class DmaapManagerTest {
 
         // stop and verify we can no longer publish
         mgr.stopPublisher(0);
-        expectException("publish,stopped", xxx -> mgr.publish(MSG));
+        expectException("publish,stopped", () -> mgr.publish(MSG));
     }
 
     @Test(expected = PoolingFeatureException.class)
@@ -377,7 +382,7 @@ public class DmaapManagerTest {
 
     private void expectException(String testnm, VFunction func) {
         try {
-            func.apply(null);
+            func.apply();
             fail(testnm + " missing exception");
 
         } catch (PoolingFeatureException expected) {
@@ -387,6 +392,6 @@ public class DmaapManagerTest {
 
     @FunctionalInterface
     public static interface VFunction {
-        public void apply(Void arg) throws PoolingFeatureException;
+        public void apply() throws PoolingFeatureException;
     }
 }
index 13d70f5..cc58838 100644 (file)
@@ -26,13 +26,13 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -43,13 +43,11 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -71,7 +69,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
- * its own feature object.
+ * its own feature object. Uses real feature objects. However, the following are not:
+ * <dl>
+ * <dt>DMaaP sources and sinks</dt>
+ * <dd>simulated using queues. There is one queue for the external topic, and one queue
+ * for each host's internal topic. Messages published to the "admin" channel are simply
+ * sent to all of the hosts' internal topic queues</dd>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
  */
 public class FeatureTest {
 
@@ -92,26 +98,15 @@ public class FeatureTest {
      */
     private static final String CONTROLLER1 = "controller.one";
 
-    // private static final long STD_HEARTBEAT_WAIT_MS = 100;
-    // private static final long STD_REACTIVATE_WAIT_MS = 200;
-    // private static final long STD_IDENTIFICATION_MS = 60;
-    // private static final long STD_ACTIVE_HEARTBEAT_MS = 5;
-    // private static final long STD_INTER_HEARTBEAT_MS = 50;
-    // private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
-    // private static final long POLL_MS = 2;
-    // private static final long INTER_POLL_MS = 2;
-    // private static final long EVENT_WAIT_SEC = 5;
-
-    // use these to slow things down
-    private static final long STD_HEARTBEAT_WAIT_MS = 5000;
-    private static final long STD_REACTIVATE_WAIT_MS = 10000;
-    private static final long STD_IDENTIFICATION_MS = 10000;
-    private static final long STD_ACTIVE_HEARTBEAT_MS = 5000;
-    private static final long STD_INTER_HEARTBEAT_MS = 12000;
-    private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
-    private static final long POLL_MS = 2;
-    private static final long INTER_POLL_MS = 2000;
-    private static final long EVENT_WAIT_SEC = 1000;
+    private static long stdReactivateWaitMs = 200;
+    private static long stdIdentificationMs = 60;
+    private static long stdStartHeartbeatMs = 60;
+    private static long stdActiveHeartbeatMs = 50;
+    private static long stdInterHeartbeatMs = 5;
+    private static long stdOfflinePubWaitMs = 2;
+    private static long stdPollMs = 2;
+    private static long stdInterPollMs = 2;
+    private static long stdEventWaitSec = 10;
 
     // these are saved and restored on exit from this test class
     private static PoolingFeature.Factory saveFeatureFactory;
@@ -128,6 +123,8 @@ public class FeatureTest {
         saveFeatureFactory = PoolingFeature.getFactory();
         saveManagerFactory = PoolingManagerImpl.getFactory();
         saveDmaapFactory = DmaapManager.getFactory();
+        
+        // note: invoke runSlow() to slow things down
     }
 
     @AfterClass
@@ -149,51 +146,35 @@ public class FeatureTest {
         }
     }
 
-    @Ignore
     @Test
     public void test_SingleHost() throws Exception {
-        int nmessages = 70;
-
-        ctx = new Context(nmessages);
-
-        ctx.addHost();
-        ctx.startHosts();
-
-        for (int x = 0; x < nmessages; ++x) {
-            ctx.offerExternal(makeMessage(x));
-        }
-
-        ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(0, ctx.getDecodeErrors());
-        assertEquals(0, ctx.getRemainingEvents());
-        ctx.checkAllSawAMsg();
+        run(70, 1);
     }
 
-    @Ignore
     @Test
     public void test_TwoHosts() throws Exception {
-        int nmessages = 200;
+        run(200, 2);
+    }
+
+    @Test
+    public void test_ThreeHosts() throws Exception {
+        run(200, 3);
+    }
 
+    private void run(int nmessages, int nhosts) throws Exception {
         ctx = new Context(nmessages);
 
-        ctx.addHost();
-        ctx.addHost();
+        for (int x = 0; x < nhosts; ++x) {
+            ctx.addHost();
+        }
+
         ctx.startHosts();
 
         for (int x = 0; x < nmessages; ++x) {
             ctx.offerExternal(makeMessage(x));
         }
 
-        // wait for all hosts to have time to process a few messages
-        Thread.sleep(STD_ACTIVE_HEARTBEAT_MS + INTER_POLL_MS * 3);
-
-        // pause a topic for a bit
-//        ctx.pauseTopic();
-
-        // now we'll see if it recovers
-
-        ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+        ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
 
         assertEquals(0, ctx.getDecodeErrors());
         assertEquals(0, ctx.getRemainingEvents());
@@ -203,6 +184,21 @@ public class FeatureTest {
     private String makeMessage(int reqnum) {
         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
     }
+    
+    /**
+     * Invoke this to slow the timers down.
+     */
+    protected static void runSlow() {
+         stdReactivateWaitMs = 10000;
+         stdIdentificationMs = 10000;
+         stdStartHeartbeatMs = 15000;
+         stdActiveHeartbeatMs = 12000;
+         stdInterHeartbeatMs = 5000;
+         stdOfflinePubWaitMs = 2;
+         stdPollMs = 2;
+         stdInterPollMs = 2000;
+         stdEventWaitSec = 1000;
+    }
 
     /**
      * Context used for a single test case.
@@ -243,12 +239,6 @@ public class FeatureTest {
          */
         private final CountDownLatch eventCounter;
 
-        /**
-         * Maps host name to its topic source. This must be in sorted order so we can
-         * identify the source for the host with the higher name.
-         */
-        private TreeMap<String, TopicSourceImpl> host2topic = new TreeMap<>();
-
         /**
          * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
          * {@link #getCurrentHost()}.
@@ -280,9 +270,14 @@ public class FeatureTest {
 
         /**
          * Creates and adds a new host to the context.
+         * 
+         * @return the new Host
          */
-        public void addHost() {
-            hosts.add(new Host(this));
+        public Host addHost() {
+            Host host = new Host(this);
+            hosts.add(host);
+
+            return host;
         }
 
         /**
@@ -442,26 +437,6 @@ public class FeatureTest {
             return eventCounter.await(time, units);
         }
 
-        /**
-         * Associates a host with a topic.
-         * 
-         * @param host
-         * @param topic
-         */
-        public void addTopicSource(String host, TopicSourceImpl topic) {
-            host2topic.put(host, topic);
-        }
-
-        /**
-         * Pauses the last topic source long enough to miss a heart beat.
-         */
-        public void pauseTopic() {
-            Entry<String, TopicSourceImpl> ent = host2topic.lastEntry();
-            if (ent != null) {
-                ent.getValue().pause(STD_ACTIVE_HEARTBEAT_MS);
-            }
-        }
-
         /**
          * Gets the current host, provided this is used from within a call to
          * {@link #withHost(Host, VoidFunction)}.
@@ -527,12 +502,10 @@ public class FeatureTest {
         }
 
         /**
-         * Gets the host name. This should only be invoked within {@link #start()}.
-         * 
          * @return the host name
          */
         public String getName() {
-            return PoolingManagerImpl.getLastHost();
+            return feature.getHost();
         }
 
         /**
@@ -758,11 +731,6 @@ public class FeatureTest {
          */
         private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
 
-        /**
-         * Time, in milliseconds, to pause before polling for more messages.
-         */
-        private AtomicLong pauseTimeMs = new AtomicLong(0);
-
         /**
          * 
          * @param context
@@ -771,12 +739,8 @@ public class FeatureTest {
          */
         public TopicSourceImpl(Context context, boolean internal) {
             if (internal) {
-                Host host = context.getCurrentHost();
-
                 this.topic = INTERNAL_TOPIC;
-                this.queue = host.getInternalQueue();
-
-                context.addTopicSource(host.getName(), this);
+                this.queue = context.getCurrentHost().getInternalQueue();
 
             } else {
                 this.topic = EXTERNAL_TOPIC;
@@ -809,11 +773,12 @@ public class FeatureTest {
 
             reregister(newPair);
 
-            new Thread(() -> {
+            Thread thread = new Thread(() -> {
+
                 try {
                     do {
                         processMessages(newPair.first(), listener);
-                    } while (!newPair.first().await(INTER_POLL_MS, TimeUnit.MILLISECONDS));
+                    } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
 
                     logger.info("topic source thread completed");
 
@@ -827,7 +792,10 @@ public class FeatureTest {
 
                 newPair.second().countDown();
 
-            }).start();
+            });
+
+            thread.setDaemon(true);
+            thread.start();
         }
 
         /**
@@ -879,19 +847,7 @@ public class FeatureTest {
         }
 
         /**
-         * Indicates that {@link #processMessages(CountDownLatch, TopicListener)} should
-         * pause a bit.
-         * 
-         * @param timeMs time, in milliseconds, to pause
-         */
-        public void pause(long timeMs) {
-            pauseTimeMs.set(timeMs);
-        }
-
-        /**
-         * Polls for messages from the topic and offers them to the listener. If
-         * {@link #pauseTimeMs} is non-zero, then it pauses for the specified time and
-         * then immediately returns.
+         * Polls for messages from the topic and offers them to the listener.
          * 
          * @param stopped triggered if processing should stop
          * @param listener
@@ -901,14 +857,7 @@ public class FeatureTest {
 
             for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
 
-                long ptm = pauseTimeMs.getAndSet(0);
-                if (ptm != 0) {
-                    logger.warn("pause processing");
-                    stopped.await(ptm, TimeUnit.MILLISECONDS);
-                    return;
-                }
-
-                String msg = queue.poll(POLL_MS, TimeUnit.MILLISECONDS);
+                String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
                 if (msg == null) {
                     return;
                 }
@@ -1038,18 +987,20 @@ public class FeatureTest {
 
             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
 
-            props.setProperty("pooling." + CONTROLLER1 + ".topic", INTERNAL_TOPIC);
-            props.setProperty("pooling." + CONTROLLER1 + ".enabled", "true");
-            props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.limit", "10000");
-            props.setProperty("pooling." + CONTROLLER1 + ".offline.queue.age.milliseconds", "1000000");
-            props.setProperty("pooling." + CONTROLLER1 + ".start.heartbeat.milliseconds", "" + STD_HEARTBEAT_WAIT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".reactivate.milliseconds", "" + STD_REACTIVATE_WAIT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".identification.milliseconds", "" + STD_IDENTIFICATION_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".active.heartbeat.milliseconds",
-                            "" + STD_ACTIVE_HEARTBEAT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".inter.heartbeat.milliseconds", "" + STD_INTER_HEARTBEAT_MS);
-            props.setProperty("pooling." + CONTROLLER1 + ".offline.publish.wait.milliseconds",
-                            "" + STD_OFFLINE_PUB_WAIT_MS);
+            props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+            props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+            props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+                            "" + stdOfflinePubWaitMs);
+            props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+                            "" + stdStartHeartbeatMs);
+            props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
+            props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
+            props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+                            "" + stdActiveHeartbeatMs);
+            props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+                            "" + stdInterHeartbeatMs);
 
             return props;
         }
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
new file mode 100644 (file)
index 0000000..6884bec
--- /dev/null
@@ -0,0 +1,734 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicEndpoint;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.drools.properties.PolicyProperties;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
+ * its own feature object. Uses real feature objects, as well as real DMaaP sources and
+ * sinks. However, the following are not:
+ * <dl>
+ * <dt>PolicyEngine, PolicyController, DroolsController</dt>
+ * <dd>mocked</dd>
+ * </dl>
+ * 
+ * <p>
+ * The following fields must be set before executing this:
+ * <ul>
+ * <li>UEB_SERVERS</li>
+ * <li>INTERNAL_TOPIC</li>
+ * <li>EXTERNAL_TOPIC</li>
+ * </ul>
+ */
+public class FeatureTest2 {
+
+    private static final Logger logger = LoggerFactory.getLogger(FeatureTest2.class);
+
+    /**
+     * UEB servers for both internal & external topics.
+     */
+    private static final String UEB_SERVERS = "";
+
+    /**
+     * Name of the topic used for inter-host communication.
+     */
+    private static final String INTERNAL_TOPIC = "";
+
+    /**
+     * Name of the topic from which "external" events "arrive".
+     */
+    private static final String EXTERNAL_TOPIC = "";
+
+    /**
+     * Consumer group to use when polling the external topic.
+     */
+    private static final String EXTERNAL_GROUP = FeatureTest2.class.getName();
+
+    /**
+     * Name of the controller.
+     */
+    private static final String CONTROLLER1 = "controller.one";
+
+    /**
+     * Maximum number of items to fetch from DMaaP in a single poll.
+     */
+    private static final String FETCH_LIMIT = "5";
+
+    private static final long STD_REACTIVATE_WAIT_MS = 10000;
+    private static final long STD_IDENTIFICATION_MS = 10000;
+    private static final long STD_START_HEARTBEAT_MS = 15000;
+    private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
+    private static final long STD_INTER_HEARTBEAT_MS = 5000;
+    private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
+    private static final long EVENT_WAIT_SEC = 15;
+
+    // these are saved and restored on exit from this test class
+    private static PoolingFeature.Factory saveFeatureFactory;
+    private static PoolingManagerImpl.Factory saveManagerFactory;
+
+    /**
+     * Sink for external DMaaP topic.
+     */
+    private static TopicSink externalSink;
+
+    /**
+     * Context for the current test case.
+     */
+    private Context ctx;
+
+
+    @BeforeClass
+    public static void setUpBeforeClass() {
+        saveFeatureFactory = PoolingFeature.getFactory();
+        saveManagerFactory = PoolingManagerImpl.getFactory();
+
+        Properties props = makeSinkProperties(EXTERNAL_TOPIC);
+        externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
+        externalSink.start();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        PoolingFeature.setFactory(saveFeatureFactory);
+        PoolingManagerImpl.setFactory(saveManagerFactory);
+
+        externalSink.stop();
+    }
+
+    @Before
+    public void setUp() {
+        ctx = null;
+    }
+
+    @After
+    public void tearDown() {
+        if (ctx != null) {
+            ctx.destroy();
+        }
+    }
+
+    @Ignore
+    @Test
+    public void test_SingleHost() throws Exception {
+        run(70, 1);
+    }
+
+    @Ignore
+    @Test
+    public void test_TwoHosts() throws Exception {
+        run(200, 2);
+    }
+
+    @Ignore
+    @Test
+    public void test_ThreeHosts() throws Exception {
+        run(200, 3);
+    }
+
+    private void run(int nmessages, int nhosts) throws Exception {
+        ctx = new Context(nmessages);
+
+        for (int x = 0; x < nhosts; ++x) {
+            ctx.addHost();
+        }
+
+        ctx.startHosts();
+        ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
+
+        for (int x = 0; x < nmessages; ++x) {
+            ctx.offerExternal(makeMessage(x));
+        }
+
+        ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(0, ctx.getDecodeErrors());
+        assertEquals(0, ctx.getRemainingEvents());
+        ctx.checkAllSawAMsg();
+    }
+
+    private String makeMessage(int reqnum) {
+        return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
+    }
+
+    private static Properties makeSinkProperties(String topic) {
+        Properties props = new Properties();
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS, topic);
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+        return props;
+    }
+
+    private static Properties makeSourceProperties(String topic) {
+        Properties props = new Properties();
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
+
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
+        props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                        + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+
+        if (EXTERNAL_TOPIC.equals(topic)) {
+            // consumer group is a constant
+            props.setProperty(
+                            PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+                                            + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX,
+                            EXTERNAL_GROUP);
+
+            // consumer instance is generated by the BusConsumer code
+        }
+
+        // else internal topic: feature populates info for internal topic
+
+        return props;
+    }
+
+    /**
+     * Context used for a single test case.
+     */
+    private static class Context {
+
+        private final FeatureFactory featureFactory;
+        private final ManagerFactory managerFactory;
+
+        /**
+         * Hosts that have been added to this context.
+         */
+        private final Deque<Host> hosts = new LinkedList<>();
+
+        /**
+         * Maps a drools controller to its policy controller.
+         */
+        private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
+
+        /**
+         * Counts the number of decode errors.
+         */
+        private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
+
+        /**
+         * Number of events we're still waiting to receive.
+         */
+        private final CountDownLatch eventCounter;
+
+        /**
+         * 
+         * @param nEvents number of events to be processed
+         */
+        public Context(int nEvents) {
+            featureFactory = new FeatureFactory(this);
+            managerFactory = new ManagerFactory(this);
+            eventCounter = new CountDownLatch(nEvents);
+
+            PoolingFeature.setFactory(featureFactory);
+            PoolingManagerImpl.setFactory(managerFactory);
+        }
+
+        /**
+         * Destroys the context, stopping any hosts that remain.
+         */
+        public void destroy() {
+            stopHosts();
+            hosts.clear();
+        }
+
+        /**
+         * Creates and adds a new host to the context.
+         * 
+         * @return the new Host
+         */
+        public Host addHost() {
+            Host host = new Host(this);
+            hosts.add(host);
+
+            return host;
+        }
+
+        /**
+         * Starts the hosts.
+         */
+        public void startHosts() {
+            hosts.forEach(host -> host.start());
+        }
+
+        /**
+         * Stops the hosts.
+         */
+        public void stopHosts() {
+            hosts.forEach(host -> host.stop());
+        }
+
+        /**
+         * Verifies that all hosts processed at least one message.
+         */
+        public void checkAllSawAMsg() {
+            int x = 0;
+            for (Host host : hosts) {
+                assertTrue("x=" + x, host.messageSeen());
+                ++x;
+            }
+        }
+
+        /**
+         * Offers an event to the external topic.
+         * 
+         * @param event
+         */
+        public void offerExternal(String event) {
+            externalSink.send(event);
+        }
+
+        /**
+         * Decodes an event.
+         * 
+         * @param event
+         * @return the decoded event, or {@code null} if it cannot be decoded
+         */
+        public Object decodeEvent(String event) {
+            return managerFactory.decodeEvent(null, null, event);
+        }
+
+        /**
+         * Associates a controller with its drools controller.
+         * 
+         * @param controller
+         * @param droolsController
+         */
+        public void addController(PolicyController controller, DroolsController droolsController) {
+            drools2policy.put(droolsController, controller);
+        }
+
+        /**
+         * @param droolsController
+         * @return the controller associated with a drools controller, or {@code null} if
+         *         it has no associated controller
+         */
+        public PolicyController getController(DroolsController droolsController) {
+            return drools2policy.get(droolsController);
+        }
+
+        /**
+         * 
+         * @return the number of decode errors so far
+         */
+        public int getDecodeErrors() {
+            return nDecodeErrors.get();
+        }
+
+        /**
+         * Increments the count of decode errors.
+         */
+        public void bumpDecodeErrors() {
+            nDecodeErrors.incrementAndGet();
+        }
+
+        /**
+         * 
+         * @return the number of events that haven't been processed
+         */
+        public long getRemainingEvents() {
+            return eventCounter.getCount();
+        }
+
+        /**
+         * Adds an event to the counter.
+         */
+        public void addEvent() {
+            eventCounter.countDown();
+        }
+
+        /**
+         * Waits, for a period of time, for all events to be processed.
+         * 
+         * @param time
+         * @param units
+         * @return {@code true} if all events have been processed, {@code false} otherwise
+         * @throws InterruptedException
+         */
+        public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
+            return eventCounter.await(time, units);
+        }
+
+        /**
+         * Waits, for a period of time, for all hosts to enter the Active state.
+         * 
+         * @param timeMs maximum time to wait, in milliseconds
+         * @throws InterruptedException
+         */
+        public void awaitAllActive(long timeMs) throws InterruptedException {
+            long tend = timeMs + System.currentTimeMillis();
+
+            for (Host host : hosts) {
+                long tremain = Math.max(0, tend - System.currentTimeMillis());
+                assertTrue(host.awaitActive(tremain));
+            }
+        }
+    }
+
+    /**
+     * Simulates a single "host".
+     */
+    private static class Host {
+
+        private final PoolingFeature feature = new PoolingFeature();
+
+        /**
+         * {@code True} if this host has processed a message, {@code false} otherwise.
+         */
+        private final AtomicBoolean sawMsg = new AtomicBoolean(false);
+
+        private final TopicSource externalSource;
+
+        // mock objects
+        private final PolicyEngine engine = mock(PolicyEngine.class);
+        private final ListenerController controller = mock(ListenerController.class);
+        private final DroolsController drools = mock(DroolsController.class);
+
+        /**
+         * 
+         * @param context
+         */
+        public Host(Context context) {
+
+            when(controller.getName()).thenReturn(CONTROLLER1);
+            when(controller.getDrools()).thenReturn(drools);
+
+            Properties props = makeSourceProperties(EXTERNAL_TOPIC);
+            externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
+
+            // stop consuming events if the controller stops
+            when(controller.stop()).thenAnswer(args -> {
+                externalSource.unregister(controller);
+                return true;
+            });
+
+            doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
+
+            context.addController(controller, drools);
+        }
+
+        /**
+         * Waits, for a period of time, for the host to enter the Active state.
+         * 
+         * @param timeMs time to wait, in milliseconds
+         * @return {@code true} if the host entered the Active state within the given
+         *         amount of time, {@code false} otherwise
+         * @throws InterruptedException
+         */
+        public boolean awaitActive(long timeMs) throws InterruptedException {
+            return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
+        }
+
+        /**
+         * Starts threads for the host so that it begins consuming from both the external
+         * "DMaaP" topic and its own internal "DMaaP" topic.
+         */
+        public void start() {
+            feature.beforeStart(engine);
+            feature.afterCreate(controller);
+
+            feature.beforeStart(controller);
+
+            // start consuming events from the external topic
+            externalSource.register(controller);
+
+            feature.afterStart(controller);
+        }
+
+        /**
+         * Stops the host's threads.
+         */
+        public void stop() {
+            feature.beforeStop(controller);
+            externalSource.unregister(controller);
+            feature.afterStop(controller);
+        }
+
+        /**
+         * Offers an event to the feature, before the policy controller handles it.
+         * 
+         * @param protocol
+         * @param topic2
+         * @param event
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+            return feature.beforeOffer(controller, protocol, topic2, event);
+        }
+
+        /**
+         * Offers an event to the feature, after the policy controller handles it.
+         * 
+         * @param protocol
+         * @param topic
+         * @param event
+         * @param success
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+
+            return feature.afterOffer(controller, protocol, topic, event, success);
+        }
+
+        /**
+         * Offers an event to the feature, before the drools controller handles it.
+         * 
+         * @param fact
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean beforeInsert(Object fact) {
+            return feature.beforeInsert(drools, fact);
+        }
+
+        /**
+         * Offers an event to the feature, after the drools controller handles it.
+         * 
+         * @param fact
+         * @param successInsert {@code true} if it was successfully inserted by the drools
+         *        controller, {@code false} otherwise
+         * @return {@code true} if the event was handled, {@code false} otherwise
+         */
+        public boolean afterInsert(Object fact, boolean successInsert) {
+            return feature.afterInsert(drools, fact, successInsert);
+        }
+
+        /**
+         * Indicates that a message was seen for this host.
+         */
+        public void sawMessage() {
+            sawMsg.set(true);
+        }
+
+        /**
+         * 
+         * @return {@code true} if a message was seen for this host, {@code false}
+         *         otherwise
+         */
+        public boolean messageSeen() {
+            return sawMsg.get();
+        }
+    }
+
+    /**
+     * Listener for the external topic. Simulates the actions taken by
+     * <i>AggregatedPolicyController.onTopicEvent</i>.
+     */
+    private static class MyExternalTopicListener implements Answer<Void> {
+
+        private final Context context;
+        private final Host host;
+
+        public MyExternalTopicListener(Context context, Host host) {
+            this.context = context;
+            this.host = host;
+        }
+
+        @Override
+        public Void answer(InvocationOnMock args) throws Throwable {
+            int i = 0;
+            CommInfrastructure commType = args.getArgument(i++);
+            String topic = args.getArgument(i++);
+            String event = args.getArgument(i++);
+
+            if (host.beforeOffer(commType, topic, event)) {
+                return null;
+            }
+
+            boolean result;
+            Object fact = context.decodeEvent(event);
+
+            if (fact == null) {
+                result = false;
+                context.bumpDecodeErrors();
+
+            } else {
+                result = true;
+
+                if (!host.beforeInsert(fact)) {
+                    // feature did not handle it so we handle it here
+                    host.afterInsert(fact, result);
+
+                    host.sawMessage();
+                    context.addEvent();
+                }
+            }
+
+            host.afterOffer(commType, topic, event, result);
+            return null;
+        }
+    }
+
+    /**
+     * Simulator for the feature-level factory.
+     */
+    private static class FeatureFactory extends PoolingFeature.Factory {
+
+        private final Context context;
+
+        /**
+         * 
+         * @param context
+         */
+        public FeatureFactory(Context context) {
+            this.context = context;
+
+            /*
+             * Note: do NOT extract anything from "context" at this point, because it
+             * hasn't been fully initialized yet
+             */
+        }
+
+        @Override
+        public Properties getProperties(String featName) {
+            Properties props = new Properties();
+
+            props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
+
+            props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
+            props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
+            props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
+            props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
+                            "" + STD_OFFLINE_PUB_WAIT_MS);
+            props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
+                            "" + STD_START_HEARTBEAT_MS);
+            props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
+            props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
+            props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
+                            "" + STD_ACTIVE_HEARTBEAT_MS);
+            props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
+                            "" + STD_INTER_HEARTBEAT_MS);
+
+            props.putAll(makeSinkProperties(INTERNAL_TOPIC));
+            props.putAll(makeSourceProperties(INTERNAL_TOPIC));
+
+            return props;
+        }
+
+        @Override
+        public PolicyController getController(DroolsController droolsController) {
+            return context.getController(droolsController);
+        }
+    }
+
+    /**
+     * Simulator for the pooling manager factory.
+     */
+    private static class ManagerFactory extends PoolingManagerImpl.Factory {
+
+        /**
+         * Used to decode events from the external topic.
+         */
+        private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
+            @Override
+            protected ObjectMapper initialValue() {
+                return new ObjectMapper();
+            }
+        };
+
+        /**
+         * Used to decode events into a Map.
+         */
+        private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
+
+        /**
+         * 
+         * @param context
+         */
+        public ManagerFactory(Context context) {
+
+            /*
+             * Note: do NOT extract anything from "context" at this point, because it
+             * hasn't been fully initialized yet
+             */
+        }
+
+        @Override
+        public boolean canDecodeEvent(DroolsController drools, String topic) {
+            return true;
+        }
+
+        @Override
+        public Object decodeEvent(DroolsController drools, String topic, String event) {
+            try {
+                return mapper.get().readValue(event, typeRef);
+
+            } catch (IOException e) {
+                logger.warn("cannot decode external event", e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Controller that also implements the {@link TopicListener} interface.
+     */
+    private static interface ListenerController extends PolicyController, TopicListener {
+
+    }
+}
index 7782e47..f8f3755 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.drools.pooling;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
@@ -122,8 +123,8 @@ public class PoolingFeatureTest {
         when(factory.getController(drools2)).thenReturn(controller2);
         when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled);
 
-        when(factory.makeManager(any(), any())).thenAnswer(args -> {
-            PoolingProperties props = args.getArgument(1);
+        when(factory.makeManager(any(), any(), any(), any())).thenAnswer(args -> {
+            PoolingProperties props = args.getArgument(2);
 
             PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
 
@@ -148,6 +149,19 @@ public class PoolingFeatureTest {
         assertEquals(2, managers.size());
     }
 
+    @Test
+    public void testGetHost() {
+        String host = pool.getHost();
+        assertNotNull(host);
+
+        // create another and ensure it generates another host name
+        pool = new PoolingFeature();
+        String host2 = pool.getHost();
+        assertNotNull(host2);
+
+        assertTrue(!host.equals(host2));
+    }
+
     @Test
     public void testGetSequenceNumber() {
         assertEquals(0, pool.getSequenceNumber());
index e32fa54..e0024b7 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -73,6 +74,7 @@ public class PoolingManagerImplTest {
     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
     protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
 
+    private static final String MY_HOST = "my.host";
     private static final String HOST2 = "other.host";
 
     private static final String MY_CONTROLLER = "my.controller";
@@ -111,6 +113,7 @@ public class PoolingManagerImplTest {
     private DroolsController drools;
     private Serializer ser;
     private Factory factory;
+    private CountDownLatch active;
 
     private PoolingManagerImpl mgr;
 
@@ -140,6 +143,7 @@ public class PoolingManagerImplTest {
 
         futures = new LinkedList<>();
         ser = new Serializer();
+        active = new CountDownLatch(1);
 
         factory = mock(Factory.class);
         eventQueue = mock(EventQueue.class);
@@ -151,7 +155,7 @@ public class PoolingManagerImplTest {
 
         when(factory.makeEventQueue(any())).thenReturn(eventQueue);
         when(factory.makeClassExtractors(any())).thenReturn(extractors);
-        when(factory.makeDmaapManager(any())).thenReturn(dmaap);
+        when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap);
         when(factory.makeScheduler()).thenReturn(sched);
         when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
         when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
@@ -179,12 +183,12 @@ public class PoolingManagerImplTest {
 
         PoolingManagerImpl.setFactory(factory);
 
-        mgr = new PoolingManagerImpl(controller, poolProps);
+        mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active);
     }
 
     @Test
-    public void testPoolingManagerImpl() {
-        mgr = new PoolingManagerImpl(controller, poolProps);
+    public void testPoolingManagerImpl() throws Exception {
+        verify(factory).makeDmaapManager(any(), any());
 
         State st = mgr.getCurrent();
         assertTrue(st instanceof IdleState);
@@ -202,7 +206,7 @@ public class PoolingManagerImplTest {
         PolicyController ctlr = mock(PolicyController.class);
 
         PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
-                        xxx -> new PoolingManagerImpl(ctlr, poolProps));
+                        () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active));
         assertNotNull(ex.getCause());
         assertTrue(ex.getCause() instanceof ClassCastException);
     }
@@ -211,23 +215,28 @@ public class PoolingManagerImplTest {
     public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
         // throw an exception when we try to create the dmaap manager
         PoolingFeatureException ex = new PoolingFeatureException();
-        when(factory.makeDmaapManager(any())).thenThrow(ex);
+        when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
 
         PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
-                        xxx -> new PoolingManagerImpl(controller, poolProps));
+                        () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
         assertEquals(ex, ex2.getCause());
     }
 
     @Test
-    public void testGetHost() {
-        String host = mgr.getHost();
-        assertNotNull(host);
+    public void testGetCurrent() throws Exception {
+        assertEquals(IdleState.class, mgr.getCurrent().getClass());
+
+        startMgr();
+
+        assertEquals(StartState.class, mgr.getCurrent().getClass());
+    }
 
-        // create another manager and ensure it generates a different host
-        mgr = new PoolingManagerImpl(controller, poolProps);
+    @Test
+    public void testGetHost() {
+        assertEquals(MY_HOST, mgr.getHost());
 
-        assertNotNull(mgr.getHost());
-        assertFalse(host.equals(mgr.getHost()));
+        mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active);
+        assertEquals(HOST2, mgr.getHost());
     }
 
     @Test
@@ -268,7 +277,7 @@ public class PoolingManagerImplTest {
         PoolingFeatureException ex = new PoolingFeatureException();
         doThrow(ex).when(dmaap).startPublisher();
 
-        PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, xxx -> mgr.beforeStart());
+        PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart());
         assertEquals(ex, ex2);
 
         // should never start the scheduler
@@ -453,8 +462,9 @@ public class PoolingManagerImplTest {
         // should have set the new filter
         verify(dmaap, times(++ntimes)).setFilter(any());
 
-        // should have cancelled the timer
-        assertEquals(1, futures.size());
+        // should have cancelled the timers
+        assertEquals(2, futures.size());
+        verify(futures.poll()).cancel(false);
         verify(futures.poll()).cancel(false);
 
         /*
@@ -465,8 +475,9 @@ public class PoolingManagerImplTest {
         // should have set the new filter
         verify(dmaap, times(++ntimes)).setFilter(any());
 
-        // timer should still be active
-        assertEquals(1, futures.size());
+        // new timers should now be active
+        assertEquals(2, futures.size());
+        verify(futures.poll(), never()).cancel(false);
         verify(futures.poll(), never()).cancel(false);
     }
 
@@ -548,7 +559,7 @@ public class PoolingManagerImplTest {
         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
 
-        verify(sched).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
+        verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
                         unitCap.capture());
 
         assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
@@ -987,7 +998,7 @@ public class PoolingManagerImplTest {
 
         // route the message to this host
         mgr.startDistributing(makeAssignments(true));
-        
+
         // generate RuntimeException when onTopicEvent() is invoked
         doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
 
@@ -1056,21 +1067,27 @@ public class PoolingManagerImplTest {
         startMgr();
 
         // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
+        assertNotNull(mgr.startDistributing(makeAssignments(true)));
         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue, never()).add(any());
 
 
-        // null assignments should be ignored
-        mgr.startDistributing(null);
+        // null assignments should cause message to be queued
+        assertNull(mgr.startDistributing(null));
+        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue).add(any());
+
 
+        // route the message to this host
+        assertNotNull(mgr.startDistributing(makeAssignments(true)));
         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue).add(any());
 
 
         // route the message to the other host
-        mgr.startDistributing(makeAssignments(false));
-
+        assertNotNull(mgr.startDistributing(makeAssignments(false)));
         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(eventQueue).add(any());
     }
 
     @Test
@@ -1086,7 +1103,8 @@ public class PoolingManagerImplTest {
         when(eventQueue.poll()).thenAnswer(args -> lst.poll());
 
         // route the messages to this host
-        assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS));
+        CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
+        assertTrue(latch.await(2, TimeUnit.SECONDS));
 
         // all of the events should have been processed locally
         verify(dmaap, times(START_PUB)).publish(any());
@@ -1106,7 +1124,8 @@ public class PoolingManagerImplTest {
         when(eventQueue.poll()).thenAnswer(args -> lst.poll());
 
         // route the messages to the OTHER host
-        assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS));
+        CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
+        assertTrue(latch.await(2, TimeUnit.SECONDS));
 
         // all of the events should have been forwarded
         verify(dmaap, times(4)).publish(any());
@@ -1142,6 +1161,7 @@ public class PoolingManagerImplTest {
         assertTrue(st instanceof ActiveState);
         assertEquals(mgr.getHost(), st.getHost());
         assertEquals(asgn, mgr.getAssignments());
+        assertEquals(0, active.getCount());
     }
 
     @Test
@@ -1149,6 +1169,7 @@ public class PoolingManagerImplTest {
         State st = mgr.goInactive();
         assertTrue(st instanceof InactiveState);
         assertEquals(mgr.getHost(), st.getHost());
+        assertEquals(1, active.getCount());
     }
 
     @Test
@@ -1330,7 +1351,7 @@ public class PoolingManagerImplTest {
      */
     private <T extends Exception> T expectException(Class<T> exClass, ExFunction<T> func) {
         try {
-            func.apply(null);
+            func.apply();
             throw new AssertionError("missing exception");
 
         } catch (Exception e) {
@@ -1349,10 +1370,9 @@ public class PoolingManagerImplTest {
         /**
          * Invokes the function.
          * 
-         * @param arg always {@code null}
          * @throws T if an error occurs
          */
-        public void apply(Void arg) throws T;
+        public void apply() throws T;
 
     }
 }
index 2d734c1..459c770 100644 (file)
@@ -48,13 +48,13 @@ public class PoolingPropertiesTest {
     public static final boolean STD_FEATURE_ENABLED = true;
     public static final int STD_OFFLINE_LIMIT = 10;
     public static final long STD_OFFLINE_AGE_MS = 1000L;
-    public static final long STD_START_HEARTBEAT_MS = 2000L;
-    public static final long STD_REACTIVATE_MS = 3000L;
-    public static final long STD_IDENTIFICATION_MS = 4000L;
-    public static final long STD_LEADER_MS = 5000L;
-    public static final long STD_ACTIVE_HEARTBEAT_MS = 6000L;
-    public static final long STD_INTER_HEARTBEAT_MS = 7000L;
-    public static final long STD_OFFLINE_PUB_WAIT_MS = 8000L;
+    public static final long STD_OFFLINE_PUB_WAIT_MS = 2000L;
+    public static final long STD_START_HEARTBEAT_MS = 3000L;
+    public static final long STD_REACTIVATE_MS = 4000L;
+    public static final long STD_IDENTIFICATION_MS = 5000L;
+    public static final long STD_LEADER_MS = 6000L;
+    public static final long STD_ACTIVE_HEARTBEAT_MS = 7000L;
+    public static final long STD_INTER_HEARTBEAT_MS = 8000L;
 
     private Properties plain;
     private PoolingProperties pooling;
@@ -98,9 +98,14 @@ public class PoolingPropertiesTest {
         doTest(OFFLINE_AGE_MS, STD_OFFLINE_AGE_MS, 60000L, xxx -> pooling.getOfflineAgeMs());
     }
 
+    @Test
+    public void testGetOfflinePubWaitMs() throws PropertyException {
+        doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
+    }
+
     @Test
     public void testGetStartHeartbeatMs() throws PropertyException {
-        doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 50000L, xxx -> pooling.getStartHeartbeatMs());
+        doTest(START_HEARTBEAT_MS, STD_START_HEARTBEAT_MS, 100000L, xxx -> pooling.getStartHeartbeatMs());
     }
 
     @Test
@@ -123,11 +128,6 @@ public class PoolingPropertiesTest {
         doTest(INTER_HEARTBEAT_MS, STD_INTER_HEARTBEAT_MS, 15000L, xxx -> pooling.getInterHeartbeatMs());
     }
 
-    @Test
-    public void testGetOfflinePubWaitMs() throws PropertyException {
-        doTest(OFFLINE_PUB_WAIT_MS, STD_OFFLINE_PUB_WAIT_MS, 3000L, xxx -> pooling.getOfflinePubWaitMs());
-    }
-
     /**
      * Tests a particular property. Verifies that the correct value is returned if the
      * specialized property has a value or the property has no value. Also verifies that
@@ -174,12 +174,12 @@ public class PoolingPropertiesTest {
         props.setProperty(specialize(FEATURE_ENABLED, CONTROLLER), "" + STD_FEATURE_ENABLED);
         props.setProperty(specialize(OFFLINE_LIMIT, CONTROLLER), "" + STD_OFFLINE_LIMIT);
         props.setProperty(specialize(OFFLINE_AGE_MS, CONTROLLER), "" + STD_OFFLINE_AGE_MS);
+        props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS);
         props.setProperty(specialize(START_HEARTBEAT_MS, CONTROLLER), "" + STD_START_HEARTBEAT_MS);
         props.setProperty(specialize(REACTIVATE_MS, CONTROLLER), "" + STD_REACTIVATE_MS);
         props.setProperty(specialize(IDENTIFICATION_MS, CONTROLLER), "" + STD_IDENTIFICATION_MS);
         props.setProperty(specialize(ACTIVE_HEARTBEAT_MS, CONTROLLER), "" + STD_ACTIVE_HEARTBEAT_MS);
         props.setProperty(specialize(INTER_HEARTBEAT_MS, CONTROLLER), "" + STD_INTER_HEARTBEAT_MS);
-        props.setProperty(specialize(OFFLINE_PUB_WAIT_MS, CONTROLLER), "" + STD_OFFLINE_PUB_WAIT_MS);
 
         return props;
     }
index 505dc40..d09650d 100644 (file)
@@ -122,6 +122,24 @@ public class SpecPropertiesTest {
         assertNull(props.getProperty(gen(PROP_UNKNOWN)));
     }
 
+    @Test
+    public void testSpecPropertiesStringStringProperties_EmptyPrefix() {
+        supportingProps = new Properties();
+
+        supportingProps.setProperty(PROP_NO_PREFIX, VAL_NO_PREFIX);
+        supportingProps.setProperty("a.value", VAL_GEN);
+        supportingProps.setProperty("b.value", VAL_GEN);
+        supportingProps.setProperty(MY_SPEC + ".b.value", VAL_SPEC);
+
+        // no supporting properties
+        props = new SpecProperties("", MY_SPEC, supportingProps);
+
+        assertEquals(VAL_NO_PREFIX, props.getProperty(gen(PROP_NO_PREFIX)));
+        assertEquals(VAL_GEN, props.getProperty(gen("a.value")));
+        assertEquals(VAL_SPEC, props.getProperty(MY_SPEC + ".b.value"));
+        assertNull(props.getProperty(gen(PROP_UNKNOWN)));
+    }
+
     @Test
     public void testWithTrailingDot() {
         // neither has trailing dot
@@ -132,6 +150,16 @@ public class SpecPropertiesTest {
         props = new SpecProperties(PREFIX_GEN, MY_SPEC + ".");
         assertEquals(PREFIX_GEN, props.getPrefix());
         assertEquals(PREFIX_SPEC, props.getSpecPrefix());
+
+        // first is empty
+        props = new SpecProperties("", MY_SPEC);
+        assertEquals("", props.getPrefix());
+        assertEquals(MY_SPEC + ".", props.getSpecPrefix());
+
+        // second is empty
+        props = new SpecProperties(PREFIX_GEN, "");
+        assertEquals(PREFIX_GEN, props.getPrefix());
+        assertEquals(PREFIX_GEN, props.getSpecPrefix());
     }
 
     @Test
index 7b4b060..27284dc 100644 (file)
@@ -298,18 +298,18 @@ public class ActiveStateTest extends BasicStateTester {
 
         // heart beat generator
         timer = repeatedTasks.remove();
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
 
         // my heart beat checker
         timer = repeatedTasks.remove();
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
 
         // predecessor's heart beat checker
         timer = repeatedTasks.remove();
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
     }
 
     @Test
@@ -327,13 +327,13 @@ public class ActiveStateTest extends BasicStateTester {
 
         // heart beat generator
         timer = repeatedTasks.remove();
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
 
         // my heart beat checker
         timer = repeatedTasks.remove();
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.first().longValue());
-        assertEquals(STD_INTER_HEARTBEAT_MS, timer.second().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.first().longValue());
+        assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.second().longValue());
     }
 
     @Test
@@ -389,13 +389,13 @@ public class ActiveStateTest extends BasicStateTester {
 
         // set up next state
         State next = mock(State.class);
-        when(mgr.goInactive()).thenReturn(next);
+        when(mgr.goStart()).thenReturn(next);
 
         // fire the task - should transition
         assertEquals(next, task.third().fire());
 
-        // should indicate failure
-        verify(mgr).internalTopicFailed();
+        // should stop distributing
+        verify(mgr).startDistributing(null);
 
         // should publish an offline message
         Offline msg = captureAdminMessage(Offline.class);
index 394adae..ae53ce0 100644 (file)
@@ -22,12 +22,19 @@ package org.onap.policy.drools.pooling.state;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
 import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Query;
 import org.onap.policy.drools.utils.Pair;
 
 public class InactiveStateTest extends BasicStateTester {
@@ -52,6 +59,42 @@ public class InactiveStateTest extends BasicStateTester {
         utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
     }
 
+    @Test
+    public void testProcessLeader() {
+        State next = mock(State.class);
+        when(mgr.goActive()).thenReturn(next);
+
+        String[] arr = {PREV_HOST, MY_HOST, HOST1};
+        BucketAssignments asgn = new BucketAssignments(arr);
+        Leader msg = new Leader(PREV_HOST, asgn);
+
+        assertEquals(next, state.process(msg));
+        verify(mgr).startDistributing(asgn);
+    }
+
+    @Test
+    public void testProcessLeader_Invalid() {
+        Leader msg = new Leader(PREV_HOST, null);
+
+        // should stay in the same state, and not start distributing
+        assertNull(state.process(msg));
+        verify(mgr, never()).startDistributing(any());
+        verify(mgr, never()).goActive();
+        verify(mgr, never()).goInactive();
+    }
+
+    @Test
+    public void testProcessQuery() {
+        State next = mock(State.class);
+        when(mgr.goQuery()).thenReturn(next);
+
+        assertEquals(next, state.process(new Query()));
+
+        Identification ident = captureAdminMessage(Identification.class);
+        assertEquals(MY_HOST, ident.getSource());
+        assertEquals(ASGN3, ident.getAssignments());
+    }
+
     @Test
     public void testGoInatcive() {
         assertNull(state.goInactive());
index e171841..7ac5843 100644 (file)
@@ -64,39 +64,6 @@ public class ProcessingStateTest extends BasicStateTester {
         utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
     }
 
-    @Test
-    public void testGoActive_WithAssignment() {
-        State act = mock(State.class);
-        State inact = mock(State.class);
-
-        when(mgr.goActive()).thenReturn(act);
-        when(mgr.goInactive()).thenReturn(inact);
-
-        String[] arr = {HOST2, PREV_HOST, MY_HOST};
-        BucketAssignments asgn = new BucketAssignments(arr);
-
-        assertEquals(act, state.goActive(asgn));
-
-        verify(mgr).startDistributing(asgn);
-    }
-
-    @Test
-    public void testGoActive_WithoutAssignment() {
-        State act = mock(State.class);
-        State inact = mock(State.class);
-
-        when(mgr.goActive()).thenReturn(act);
-        when(mgr.goInactive()).thenReturn(inact);
-
-        String[] arr = {HOST2, PREV_HOST};
-        BucketAssignments asgn = new BucketAssignments(arr);
-
-        assertEquals(inact, state.goActive(asgn));
-
-        verify(mgr).startDistributing(asgn);
-
-    }
-
     @Test
     public void testProcessQuery() {
         State next = mock(State.class);
@@ -154,13 +121,6 @@ public class ProcessingStateTest extends BasicStateTester {
         state = new ProcessingState(mgr, LEADER);
     }
 
-    @Test
-    public void testMakeIdentification() {
-        Identification ident = state.makeIdentification();
-        assertEquals(MY_HOST, ident.getSource());
-        assertEquals(ASGN3, ident.getAssignments());
-    }
-
     @Test
     public void testGetAssignments() {
         // assignments from constructor
index a7c3a3d..80778ed 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.policy.drools.pooling.state;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -254,10 +253,13 @@ public class QueryStateTest extends BasicStateTester {
         // should published an Offline message and go inactive
 
         State next = mock(State.class);
-        when(mgr.goInactive()).thenReturn(next);
+        when(mgr.goStart()).thenReturn(next);
 
         assertEquals(next, timer.second().fire());
 
+        // should stop distributing
+        verify(mgr).startDistributing(null);
+
         Offline msg = captureAdminMessage(Offline.class);
         assertEquals(MY_HOST, msg.getSource());
     }
@@ -342,21 +344,6 @@ public class QueryStateTest extends BasicStateTester {
         assertTrue(admin.isEmpty());
     }
 
-    @Test
-    public void testHasAssignment() {
-        // null assignment
-        mgr.startDistributing(null);
-        assertFalse(state.hasAssignment());
-
-        // not in assignments
-        state.setAssignments(new BucketAssignments(new String[] {HOST3}));
-        assertFalse(state.hasAssignment());
-
-        // it IS in the assignments
-        state.setAssignments(new BucketAssignments(new String[] {MY_HOST}));
-        assertTrue(state.hasAssignment());
-    }
-
     @Test
     public void testRecordInfo_NullSource() {
         state.setAssignments(ASGN3);
index af4e8f1..ee4c1ad 100644 (file)
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import java.util.Map;
@@ -39,6 +40,7 @@ import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 import org.onap.policy.drools.pooling.message.Query;
 import org.onap.policy.drools.utils.Pair;
+import org.onap.policy.drools.utils.Triple;
 
 public class StartStateTest extends BasicStateTester {
 
@@ -78,15 +80,36 @@ public class StartStateTest extends BasicStateTester {
         assertEquals(MY_HOST, msg.first());
         assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs());
 
-        Pair<Long, StateTimerTask> timer = onceTasks.removeFirst();
 
-        assertEquals(STD_HEARTBEAT_WAIT_MS, timer.first().longValue());
+        /*
+         * Verify heartbeat generator
+         */
+        Triple<Long, Long, StateTimerTask> generator = repeatedTasks.removeFirst();
+
+        assertEquals(STD_INTER_HEARTBEAT_MS, generator.first().longValue());
+        assertEquals(STD_INTER_HEARTBEAT_MS, generator.second().longValue());
+
+        // invoke the task - it should generate another heartbeat
+        assertEquals(null, generator.third().fire());
+        verify(mgr, times(2)).publish(MY_HOST, msg.second());
+
+        // and again
+        assertEquals(null, generator.third().fire());
+        verify(mgr, times(3)).publish(MY_HOST, msg.second());
+
+
+        /*
+         * Verify heartbeat checker
+         */
+        Pair<Long, StateTimerTask> checker = onceTasks.removeFirst();
+
+        assertEquals(STD_HEARTBEAT_WAIT_MS, checker.first().longValue());
 
         // invoke the task - it should go to the state returned by the mgr
         State next = mock(State.class);
         when(mgr.goInactive()).thenReturn(next);
 
-        assertEquals(next, timer.second().fire());
+        assertEquals(next, checker.second().fire());
 
         verify(mgr).internalTopicFailed();
     }
index a184dfa..47624aa 100644 (file)
@@ -154,12 +154,48 @@ public class StateTest extends BasicStateTester {
     }
 
     @Test
-    public void testGoActive() {
-        State next = mock(State.class);
-        when(mgr.goActive()).thenReturn(next);
+    public void testGoActive_WithAssignment() {
+        State act = mock(State.class);
+        State inact = mock(State.class);
 
-        State next2 = state.goActive();
-        assertEquals(next, next2);
+        when(mgr.goActive()).thenReturn(act);
+        when(mgr.goInactive()).thenReturn(inact);
+
+        String[] arr = {HOST2, PREV_HOST, MY_HOST};
+        BucketAssignments asgn = new BucketAssignments(arr);
+
+        assertEquals(act, state.goActive(asgn));
+
+        verify(mgr).startDistributing(asgn);
+    }
+
+    @Test
+    public void testGoActive_WithoutAssignment() {
+        State act = mock(State.class);
+        State inact = mock(State.class);
+
+        when(mgr.goActive()).thenReturn(act);
+        when(mgr.goInactive()).thenReturn(inact);
+
+        String[] arr = {HOST2, PREV_HOST};
+        BucketAssignments asgn = new BucketAssignments(arr);
+
+        assertEquals(inact, state.goActive(asgn));
+
+        verify(mgr).startDistributing(asgn);
+    }
+
+    @Test
+    public void testGoActive_NullAssignment() {
+        State act = mock(State.class);
+        State inact = mock(State.class);
+
+        when(mgr.goActive()).thenReturn(act);
+        when(mgr.goInactive()).thenReturn(inact);
+
+        assertEquals(inact, state.goActive(null));
+
+        verify(mgr, never()).startDistributing(any());
     }
 
     @Test
@@ -374,6 +410,20 @@ public class StateTest extends BasicStateTester {
         verify(sched).cancel();
     }
 
+    @Test
+    public void testMissedHeartbeat() {
+        State next = mock(State.class);
+        when(mgr.goStart()).thenReturn(next);
+
+        State next2 = state.missedHeartbeat();
+        assertEquals(next, next2);
+
+        verify(mgr).startDistributing(null);
+
+        Offline msg = captureAdminMessage(Offline.class);
+        assertEquals(MY_HOST, msg.getSource());
+    }
+
     @Test
     public void testInternalTopicFailed() {
         State next = mock(State.class);
@@ -397,6 +447,13 @@ public class StateTest extends BasicStateTester {
         assertEquals(timestamp, msg.getTimestampMs());
     }
 
+    @Test
+    public void testMakeIdentification() {
+        Identification ident = state.makeIdentification();
+        assertEquals(MY_HOST, ident.getSource());
+        assertEquals(ASGN3, ident.getAssignments());
+    }
+
     @Test
     public void testMakeOffline() {
         Offline msg = state.makeOffline();
index 4ef8e2d..83d4e72 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,11 +28,6 @@ import org.onap.policy.drools.event.comm.TopicSource;
  */
 public interface BusTopicSource extends ApiKeyEnabled, TopicSource {
        
-       /**
-        * Default Consumer Instance Value
-        */
-       public static String DEFAULT_CONSUMER_INSTANCE = "0";
-       
        /**
         * Default Timeout fetching in milliseconds
         */
index 8e18bba..1efaa06 100644 (file)
@@ -147,9 +147,7 @@ public interface BusPublisher {
                
                @Override
                public String toString() {
-                       StringBuilder builder = new StringBuilder();
-                       builder.append("CambriaPublisherWrapper []");
-                       return builder.toString();
+                   return "CambriaPublisherWrapper []";
                }
                
        }
@@ -287,15 +285,10 @@ public interface BusPublisher {
                
                @Override
                public String toString() {
-                       StringBuilder builder = new StringBuilder();
-                       builder.append("DmaapPublisherWrapper [").
-                       append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
-                       append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
-                       append(", publisher.getHost()=").append(publisher.getHost()).
-                       append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
-                       append(", publisher.getUsername()=").append(publisher.getUsername()).
-                       append("]");
-                       return builder.toString();
+            return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
+                            + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
+                            + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
+                            + ", publisher.getUsername()=" + publisher.getUsername() + "]";
                }
        }
        
@@ -329,13 +322,16 @@ public interface BusPublisher {
                        
                        String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
                        
-                        if (environment == null || environment.isEmpty()) {
+                       if (environment == null || environment.isEmpty()) {
                                        throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
-                       } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+                       }
+                       if (aftEnvironment == null || aftEnvironment.isEmpty()) {
                                throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
-                       } if (latitude == null || latitude.isEmpty()) {
+                       }
+                       if (latitude == null || latitude.isEmpty()) {
                                throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
-                       } if (longitude == null || longitude.isEmpty()) {
+                       }
+                       if (longitude == null || longitude.isEmpty()) {
                                throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
                        }
                        
index f145f3b..0bf3d44 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -67,8 +67,7 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
                                                  String apiKey, 
                                                  String apiSecret,
                                                  boolean useHttps,
-                                                 boolean allowSelfSignedCerts) 
-       throws IllegalArgumentException {
+                                                 boolean allowSelfSignedCerts) {
                
                super(servers, topic);
                
@@ -102,6 +101,26 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
                return allowSelfSignedCerts;
        }
 
+    protected boolean anyNullOrEmpty(String... args) {
+        for (String arg : args) {
+            if (arg == null || arg.isEmpty()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    protected boolean allNullOrEmpty(String... args) {
+        for (String arg : args) {
+            if (!(arg == null || arg.isEmpty())) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
 
        @Override
        public String toString() {
index f86c27c..a50d7b1 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -63,8 +63,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
         * @throws IllegalArgumentException in invalid parameters are passed in
         */
        public InlineBusTopicSink(List<String> servers, String topic, 
-                                         String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
-                       throws IllegalArgumentException {
+                                         String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) {
                
                super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);               
                
@@ -82,7 +81,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
         * {@inheritDoc}
         */
        @Override
-       public boolean start() throws IllegalStateException {           
+       public boolean start() {                
                logger.info("{}: starting", this);
                
                synchronized(this) {
@@ -132,7 +131,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
         * {@inheritDoc}
         */
        @Override
-       public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
+       public boolean send(String message) {
                
                if (message == null || message.isEmpty()) {
                        throw new IllegalArgumentException("Message to send is empty");
@@ -181,16 +180,33 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
         * {@inheritDoc}
         */
        @Override
-       public void shutdown() throws IllegalStateException {
+       public void shutdown() {
                this.stop();
        }
+
+    protected boolean anyNullOrEmpty(String... args) {
+        for (String arg : args) {
+            if (arg == null || arg.isEmpty()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    protected boolean allNullOrEmpty(String... args) {
+        for (String arg : args) {
+            if (!(arg == null || arg.isEmpty())) {
+                return false;
+            }
+        }
+
+        return true;
+    }
        
 
        @Override
        public String toString() {
-               StringBuilder builder = new StringBuilder();
-               builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive)
-                               .append(", publisher=").append(publisher).append("]");
-               return builder.toString();
+        return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
        }
 }
index 718bb21..48116e3 100644 (file)
@@ -75,8 +75,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
                                                                                        String partitionKey,
                                                                                        String environment, String aftEnvironment, String partner,
                                                                                        String latitude, String longitude, Map<String,String> additionalProps,
-                                                                                       boolean useHttps, boolean allowSelfSignedCerts)
-                       throws IllegalArgumentException {
+                                                                                       boolean useHttps, boolean allowSelfSignedCerts) {
                        
                super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
                
@@ -96,8 +95,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
        public InlineDmaapTopicSink(List<String> servers, String topic, 
                                            String apiKey, String apiSecret,
                                    String userName, String password,
-                                           String partitionKey, boolean useHttps,  boolean allowSelfSignedCerts) 
-               throws IllegalArgumentException {
+                                           String partitionKey, boolean useHttps,  boolean allowSelfSignedCerts) {
                
                super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
                
@@ -108,11 +106,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
 
        @Override
        public void init() {
-               if ((this.environment == null   || this.environment.isEmpty()) &&
-                  (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
-                  (this.latitude == null               || this.latitude.isEmpty()) &&
-                  (this.longitude == null              || this.longitude.isEmpty()) &&
-                  (this.partner == null                || this.partner.isEmpty())) {
+               if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
                        this.publisher = 
                                new BusPublisher.CambriaPublisherWrapper(this.servers, 
                                                                               this.topic, 
@@ -142,11 +136,9 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
 
        @Override
        public String toString() {
-               StringBuilder builder = new StringBuilder();
-               builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password)
-                               .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
-                               .append(super.toString()).append("]");
-               return builder.toString();
+        return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
+                        + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
+                        + super.toString() + "]";
        }
 
 }
index 0c01c8b..d1218f3 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -57,8 +57,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
                                                String apiSecret,
                                                String partitionId,
                                                boolean useHttps,
-                                               boolean allowSelfSignedCerts) 
-    throws IllegalArgumentException {          
+                                               boolean allowSelfSignedCerts) {
                super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts);
        }
        
index 9b2be6a..768046d 100644 (file)
 package org.onap.policy.drools.event.comm.bus.internal;
 
 import java.net.MalformedURLException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.onap.policy.drools.event.comm.FilterableTopicSource;
 import org.onap.policy.drools.event.comm.TopicListener;
 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
 import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
+import org.onap.policy.drools.utils.NetworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This topic source implementation specializes in reading messages
@@ -72,24 +71,11 @@ public abstract class SingleThreadedBusTopicSource
         */
        protected BusConsumer consumer;
        
-       /**
-        * Am I running?
-        * reflects invocation of start()/stop() 
-        * !locked & start() => alive
-        * stop() => !alive
-        */
-       protected volatile boolean alive = false;
-       
        /**
         * Independent thread reading message over my topic
         */
        protected Thread busPollerThread;
        
-       /**
-        * All my subscribers for new message notifications
-        */
-       protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
-       
 
        /**
         * 
@@ -114,8 +100,7 @@ public abstract class SingleThreadedBusTopicSource
                                                                int fetchTimeout,
                                                                int fetchLimit,
                                                                boolean useHttps,
-                                                               boolean allowSelfSignedCerts) 
-       throws IllegalArgumentException {
+                                                               boolean allowSelfSignedCerts) {
                
                super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
                
@@ -126,7 +111,7 @@ public abstract class SingleThreadedBusTopicSource
                }
                
                if (consumerInstance == null || consumerInstance.isEmpty()) {
-                       this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
+            this.consumerInstance = NetworkUtil.getHostname();
                } else {
                        this.consumerInstance = consumerInstance;
                }
@@ -151,8 +136,7 @@ public abstract class SingleThreadedBusTopicSource
        public abstract void init() throws MalformedURLException;
        
        @Override
-       public void register(TopicListener topicListener) 
-               throws IllegalArgumentException {               
+       public void register(TopicListener topicListener) {
                
                super.register(topicListener);
                
@@ -181,7 +165,7 @@ public abstract class SingleThreadedBusTopicSource
        }
        
        @Override
-       public boolean start() throws IllegalStateException {           
+       public boolean start() {
                logger.info("{}: starting", this);
                
                synchronized(this) {
@@ -298,23 +282,10 @@ public abstract class SingleThreadedBusTopicSource
 
     @Override
        public String toString() {
-               StringBuilder builder = new StringBuilder();
-               builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
-                               .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
-                               .append(", fetchLimit=").append(fetchLimit)
-                               .append(", consumer=").append(this.consumer).append(", alive=")
-                               .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
-                               .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
-                               .append("]");
-               return builder.toString();
-       }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean isAlive() {
-               return alive;
+        return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
+                        + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer="
+                        + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread
+                        + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]";
        }
 
        /**
@@ -337,7 +308,7 @@ public abstract class SingleThreadedBusTopicSource
         * {@inheritDoc}
         */
        @Override
-       public void shutdown() throws IllegalStateException {
+       public void shutdown() {
                this.stop();
                this.topicListeners.clear();
        }
index b0c456d..6a9a2d6 100644 (file)
@@ -79,8 +79,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
                                                                                        int fetchTimeout, int fetchLimit,
                                                                                        String environment, String aftEnvironment, String partner,
                                                                                        String latitude, String longitude, Map<String,String> additionalProps,
-                                                                                       boolean useHttps, boolean allowSelfSignedCerts)
-                       throws IllegalArgumentException {
+                                                                                       boolean useHttps, boolean allowSelfSignedCerts) {
                        
                super(servers, topic, apiKey, apiSecret, 
                          consumerGroup, consumerInstance, 
@@ -123,8 +122,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
                                                      String apiKey, String apiSecret,
                                                      String userName, String password,
                                                      String consumerGroup, String consumerInstance, 
-                                                     int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
-                       throws IllegalArgumentException {
+                                                     int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
                
                
                super(servers, topic, apiKey, apiSecret, 
@@ -148,19 +146,14 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
         */
        @Override
        public void init() throws MalformedURLException {
-               if (this.userName == null || this.userName.isEmpty() || 
-                               this.password == null || this.password.isEmpty()) {
+               if (anyNullOrEmpty(this.userName, this.password)) {
                                this.consumer =
                                                new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
                                                                                           this.apiKey, this.apiSecret,
                                                                                           this.consumerGroup, this.consumerInstance,
                                                                                           this.fetchTimeout, this.fetchLimit,
                                                                                           this.useHttps, this.allowSelfSignedCerts);
-               } else if ((this.environment == null    || this.environment.isEmpty()) &&
-                                  (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
-                                  (this.latitude == null          || this.latitude.isEmpty()) &&
-                                  (this.longitude == null         || this.longitude.isEmpty()) &&
-                                  (this.partner == null           || this.partner.isEmpty())) {
+               } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
                        this.consumer =
                                        new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
                                                                                    this.apiKey, this.apiSecret,
index e394e3d..fcbee63 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -52,8 +52,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
        public SingleThreadedUebTopicSource(List<String> servers, String topic, 
                                                    String apiKey, String apiSecret,
                                                    String consumerGroup, String consumerInstance, 
-                                                   int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
-                       throws IllegalArgumentException {
+                                                   int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
                
                super(servers, topic, apiKey, apiSecret, 
                          consumerGroup, consumerInstance, 
index b1b2980..22c6b1d 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -82,7 +82,7 @@ public abstract class TopicBase implements Topic {
         * @return a Topic Base
         * @throws IllegalArgumentException if invalid parameters are present
         */
-       public TopicBase(List<String> servers, String topic) throws IllegalArgumentException {
+       public TopicBase(List<String> servers, String topic) {
                
                if (servers == null || servers.isEmpty()) {
                        throw new IllegalArgumentException("Server(s) must be provided");
@@ -97,8 +97,7 @@ public abstract class TopicBase implements Topic {
        }
        
        @Override
-       public void register(TopicListener topicListener) 
-               throws IllegalArgumentException {               
+       public void register(TopicListener topicListener) {
                
                logger.info("{}: registering {}", this, topicListener);
                
index 9b18cae..6a254e2 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,21 +25,16 @@ import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
-
-import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.Response;
-
 import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
 import org.onap.policy.drools.http.client.HttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
 public class JerseyClient implements HttpClient {
@@ -99,17 +94,18 @@ public class JerseyClient implements HttpClient {
                        if (this.selfSignedCerts) {
                                sslContext.init(null, new TrustManager[]{new X509TrustManager() {
                                        @Override
-                               public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
+                               public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+                                           // always trusted
+                                       }
                                        @Override
-                               public void checkServerTrusted(X509Certificate[]  chain, String authType) throws CertificateException {}
+                               public void checkServerTrusted(X509Certificate[]  chain, String authType) throws CertificateException {
+                                           // always trusted
+                                       }
                                        @Override
                                public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
        
                            }}, new SecureRandom());
-                                clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier(new HostnameVerifier() {
-                                       @Override
-                                       public boolean verify(String hostname, SSLSession session) {return true;}
-                                });
+                               clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier((host,session) -> true);
                        } else {
                                sslContext.init(null, null, null);
                                clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext);
@@ -147,17 +143,17 @@ public class JerseyClient implements HttpClient {
        
 
        @Override
-       public boolean start() throws IllegalStateException {
+       public boolean start() {
                return alive;
        }
 
        @Override
-       public boolean stop() throws IllegalStateException {
+       public boolean stop() {
                return !alive;
        }
 
        @Override
-       public void shutdown() throws IllegalStateException {
+       public void shutdown() {
                synchronized(this) {
                        alive = false;
                }
index a40bad9..3cd702a 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,6 +25,12 @@ import org.onap.policy.drools.properties.Startable;
  * A Jetty Server to server REST Requests
  */
 public interface HttpServletServer extends Startable {
+    
+    
+    /**
+     * factory for managing and tracking DMAAP sources
+     */
+    public static HttpServletServerFactory factory = new IndexedHttpServletServerFactory();
 
        /**
         * 
@@ -73,10 +79,4 @@ public interface HttpServletServer extends Startable {
         * @throws InterruptedException if the blocking operation is interrupted
         */
        public boolean waitedStart(long maxWaitTime) throws InterruptedException;
-       
-       
-       /**
-        * factory for managing and tracking DMAAP sources
-        */
-       public static HttpServletServerFactory factory = new IndexedHttpServletServerFactory();
 }
index 8c35602..f4dc85b 100644 (file)
@@ -1,8 +1,8 @@
-/*-
+/*
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -48,8 +48,7 @@ public interface HttpServletServerFactory {
         * @throws IllegalArgumentException when invalid parameters are provided
         */
        public HttpServletServer build(String name, String host, int port, String contextPath, 
-                                              boolean swagger, boolean managed)
-                  throws IllegalArgumentException;
+                                              boolean swagger, boolean managed);
        
        /**
         * list of http servers per properties
@@ -58,7 +57,7 @@ public interface HttpServletServerFactory {
         * @return list of http servers
         * @throws IllegalArgumentException when invalid parameters are provided
         */
-       public List<HttpServletServer> build(Properties properties) throws IllegalArgumentException;
+       public List<HttpServletServer> build(Properties properties);
        
        /**
         * gets a server based on the port
@@ -92,7 +91,9 @@ public interface HttpServletServerFactory {
  */
 class IndexedHttpServletServerFactory implements HttpServletServerFactory {
        
-       /**
+       private static final String SPACES_COMMA_SPACES = "\\s*,\\s*";
+
+    /**
         * logger
         */
        protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class);        
@@ -105,8 +106,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
        @Override
        public synchronized HttpServletServer build(String name, String host, int port, 
                                                            String contextPath, boolean swagger,
-                                                           boolean managed) 
-               throws IllegalArgumentException {       
+                                                           boolean managed) {
                
                if (servers.containsKey(port))
                        return servers.get(port);
@@ -119,8 +119,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
        }
        
        @Override
-       public synchronized List<HttpServletServer> build(Properties properties) 
-               throws IllegalArgumentException {       
+       public synchronized List<HttpServletServer> build(Properties properties) {
                
                ArrayList<HttpServletServer> serviceList = new ArrayList<>();
                
@@ -130,8 +129,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
                        return serviceList;
                }
                
-               List<String> serviceNameList = 
-                               new ArrayList<>(Arrays.asList(serviceNames.split("\\s*,\\s*")));
+               List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES));
                
                for (String serviceName : serviceNameList) {
                        String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + 
@@ -205,15 +203,13 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
                        }
                        
                        if (restClasses != null && !restClasses.isEmpty()) {
-                               List<String> restClassesList = 
-                                               new ArrayList<>(Arrays.asList(restClasses.split("\\s*,\\s*")));
+                               List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES));
                                for (String restClass : restClassesList)
                                        service.addServletClass(restUriPath, restClass);
                        }
                        
                        if (restPackages != null && !restPackages.isEmpty()) {
-                               List<String> restPackageList = 
-                                               new ArrayList<>(Arrays.asList(restPackages.split("\\s*,\\s*")));
+                               List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES));
                                for (String restPackage : restPackageList)
                                        service.addServletPackage(restUriPath, restPackage);
                        }
@@ -225,7 +221,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
        }
 
        @Override
-       public synchronized HttpServletServer get(int port) throws IllegalArgumentException {
+       public synchronized HttpServletServer get(int port) {
                
                if (servers.containsKey(port)) {
                        return servers.get(port);
@@ -240,7 +236,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
        }
        
        @Override
-       public synchronized void destroy(int port) throws IllegalArgumentException, IllegalStateException {
+       public synchronized void destroy(int port) {
                
                if (!servers.containsKey(port)) {
                        return;
@@ -251,7 +247,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
        }
 
        @Override
-       public synchronized void destroy() throws IllegalArgumentException, IllegalStateException {
+       public synchronized void destroy() {
                List<HttpServletServer> httpServletServers = this.inventory();
                for (HttpServletServer server: httpServletServers) {
                        server.shutdown();
index 4f7c151..0cbd983 100644 (file)
  */
 package org.onap.policy.drools.http.server.internal;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.HashMap;
-
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.onap.policy.drools.utils.NetworkUtil;
-import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
-
+import org.slf4j.LoggerFactory;
 import io.swagger.jersey.config.JerseyJaxrsConfig;
 
 /**