Merge "Remove Target and TargetType"
authorJim Hahn <jrh3@att.com>
Fri, 28 Aug 2020 12:28:06 +0000 (12:28 +0000)
committerGerrit Code Review <gerrit@onap.org>
Fri, 28 Aug 2020 12:28:06 +0000 (12:28 +0000)
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java

index 066c38b..5a83b46 100644 (file)
 package org.onap.policy.sim.dmaap.e2e;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
 import java.io.File;
 import java.io.PrintWriter;
 import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.security.GeneralSecurityException;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -39,7 +47,6 @@ import org.junit.Test;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.network.NetworkUtil;
@@ -56,6 +63,7 @@ public class EndToEndTest extends CommonRestServer {
     private static final int MAX_WAIT_SEC = 5;
     private static final String TOPIC = "MY-TOPIC";
     private static final String TOPIC2 = "MY-TOPIC-B";
+    private static final String TOPIC3 = "MY-TOPIC-C";
     private static final int MAX_MSG = 200;
 
     private static Main main;
@@ -75,6 +83,16 @@ public class EndToEndTest extends CommonRestServer {
      */
     private static TopicParameterGroup topicConfig;
 
+    /**
+     * The "host:port", extracted from <i>httpPrefix</i>.
+     */
+    private static String hostPort;
+
+    /**
+     * Unique consumer name used by a single test case.
+     */
+    private int consumerName;
+
     /**
      * Starts the rest server.
      *
@@ -105,6 +123,8 @@ public class EndToEndTest extends CommonRestServer {
                         .register((infra, topic, event) -> queue.add(event));
         TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
                         .register((infra, topic, event) -> queue2.add(event));
+
+        hostPort = httpPrefix.substring(httpPrefix.indexOf("http://"), httpPrefix.length() - 1);
     }
 
     /**
@@ -127,6 +147,8 @@ public class EndToEndTest extends CommonRestServer {
      */
     @Before
     public void setUp() {
+        ++consumerName;
+
         queue.clear();
         queue2.clear();
     }
@@ -177,11 +199,16 @@ public class EndToEndTest extends CommonRestServer {
      */
     private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
                     throws Exception {
+
+        /*
+         * Force consumer name to be registered with the server by attempting to fetch a message.
+         */
+        buildConsumer(0).fetch();
+
         String msg1 = "{'abc':10.0}".replace('\'', '"');
         String msg2 = "{'def':20.0}".replace('\'', '"');
 
-        TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
-        URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
+        URL url = new URL(httpPrefix + "events/" + TOPIC3);
 
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setRequestMethod("POST");
@@ -195,8 +222,29 @@ public class EndToEndTest extends CommonRestServer {
 
         assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
 
-        assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
-        assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+        // fetch the messages
+        Iterator<String> iter = buildConsumer(1000).fetch().iterator();
+
+        assertTrue(testName + " have message 1", iter.hasNext());
+        assertEquals(testName + " message 1", msg1, iter.next());
+
+        assertTrue(testName + " have message 2", iter.hasNext());
+        assertEquals(testName + " message 2", msg2, iter.next());
+
+        // no more messages
+        assertFalse(testName + " extra message", iter.hasNext());
+    }
+
+    private CambriaConsumer buildConsumer(int timeoutMs) throws MalformedURLException, GeneralSecurityException {
+        ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+
+        builder.knownAs(String.valueOf(consumerName), "my-consumer-id")
+                .usingHosts(hostPort).onTopic(TOPIC3)
+                .waitAtServer(timeoutMs).receivingAtMost(5);
+
+        builder.withSocketTimeout(timeoutMs + 2000);
+
+        return builder.build();
     }
 
     /**