Make feature-pooling-dmaap work without filtering
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / FeatureTest.java
index 96b358d..efab636 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2020 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -47,20 +47,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
 import org.onap.policy.common.endpoints.event.comm.Topic;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
 import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.system.PolicyController;
 import org.onap.policy.drools.system.PolicyEngine;
 import org.slf4j.Logger;
@@ -215,10 +214,6 @@ public class FeatureTest {
          * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
          */
         private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
-        /**
-         * Queue for the external "DMaaP" topic.
-         */
-        private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
         /**
          * Counts the number of decode errors.
          */
@@ -306,13 +301,16 @@ public class FeatureTest {
         }
 
         /**
-         * Offers an event to the external topic.
+         * Offers an event to the external topic. As each host needs a copy, it is posted
+         * to each Host's queue.
          *
          * @param event event
          */
 
         public void offerExternal(String event) {
-            externalTopic.offer(event);
+            for (Host host : hosts) {
+                host.getExternalTopic().offer(event);
+            }
         }
 
         /**
@@ -336,20 +334,6 @@ public class FeatureTest {
             channel2queue.values().forEach(queue -> queue.offer(message));
         }
 
-        /**
-         * Offers amessage to an internal channel.
-         *
-         * @param channel channel
-         * @param message message
-         */
-
-        public void offerInternal(String channel, String message) {
-            BlockingQueue<String> queue = channel2queue.get(channel);
-            if (queue != null) {
-                queue.offer(message);
-            }
-        }
-
         /**
          * Associates a controller with its drools controller.
          *
@@ -373,16 +357,6 @@ public class FeatureTest {
             return drools2policy.get(droolsController);
         }
 
-        /**
-         * Constructor.
-         *
-         * @return queue for the external topic
-         */
-
-        public BlockingQueue<String> getExternalTopic() {
-            return externalTopic;
-        }
-
         /**
          * Get decode errors.
          *
@@ -464,6 +438,12 @@ public class FeatureTest {
 
         private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
 
+        /**
+         * Queue for the external "DMaaP" topic.
+         */
+        @Getter
+        private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
         /**
          * Source that reads from the external topic and posts to the listener.
          */
@@ -493,7 +473,7 @@ public class FeatureTest {
             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
             context.addController(controller, drools);
             // arrange to read from the external topic
-            externalSource = new TopicSourceImpl(context, false);
+            externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
             feature = new PoolingFeatureImpl(context);
         }
 
@@ -666,10 +646,6 @@ public class FeatureTest {
 
     private static class TopicSinkImpl extends TopicImpl implements TopicSink {
         private final Context context;
-        /**
-         * Used to decode the messages so that the channel can be extracted.
-         */
-        private final Serializer serializer = new Serializer();
 
         /**
          * Constructor.
@@ -687,15 +663,7 @@ public class FeatureTest {
                 return false;
             }
             try {
-                Message msg = serializer.decodeMsg(message);
-                String channel = msg.getChannel();
-                if (Message.ADMIN.equals(channel)) {
-                    // add to every queue
-                    context.offerInternal(message);
-                } else {
-                    // add to a specific queue
-                    context.offerInternal(channel, message);
-                }
+                context.offerInternal(message);
                 return true;
             } catch (JsonParseException e) {
                 logger.warn("could not decode message: {}", message);
@@ -709,7 +677,7 @@ public class FeatureTest {
      * Source implementation that reads from a queue associated with a topic.
      */
 
-    private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+    private static class TopicSourceImpl extends TopicImpl implements TopicSource {
 
         private final String topic;
         /**
@@ -726,24 +694,13 @@ public class FeatureTest {
         /**
          * Constructor.
          *
-         * @param context context
-         * @param internal {@code true} if to read from the internal topic, {@code false}
-         *        to read from the external topic
+         * @param type topic type
+         * @param queue topic from which to read
          */
 
-        public TopicSourceImpl(Context context, boolean internal) {
-            if (internal) {
-                this.topic = INTERNAL_TOPIC;
-                this.queue = context.getCurrentHost().getInternalQueue();
-            } else {
-                this.topic = EXTERNAL_TOPIC;
-                this.queue = context.getExternalTopic();
-            }
-        }
-
-        @Override
-        public void setFilter(String filter) {
-            logger.info("topic filter set to: {}", filter);
+        public TopicSourceImpl(String type, BlockingQueue<String> queue) {
+            this.topic = type;
+            this.queue = queue;
         }
 
         @Override
@@ -1056,7 +1013,8 @@ public class FeatureTest {
 
         @Override
         protected List<TopicSource> getTopicSources() {
-            return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
+            return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
+                            currentContext.get().getCurrentHost().getInternalQueue()));
         }
 
         @Override