/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.sim.dmaap.e2e;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
import java.io.File;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.security.GeneralSecurityException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
-import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
+import org.onap.policy.models.sim.dmaap.startstop.Main;
/**
* This tests the simulator using dmaap endpoints to verify that it works from publisher
*/
public class EndToEndTest extends CommonRestServer {
private static final int MAX_WAIT_SEC = 5;
- private static final String ORIG_TOPIC = "MY-TOPIC";
- private static final String ORIG_TOPIC2 = "MY-TOPIC-B";
+ private static final String TOPIC = "MY-TOPIC";
+ private static final String TOPIC2 = "MY-TOPIC-B";
+ private static final String TOPIC3 = "MY-TOPIC-C";
private static final int MAX_MSG = 200;
- private static int ntests = 0;
- private static String topicJson;
-
- private TopicParameterGroup topicConfig;
-
- private String topic = "MY-TOPIC";
- private String topic2 = "MY-TOPIC-B";
+ private static Main main;
/**
* Messages from the topic are placed here by the endpoint.
*/
- private BlockingQueue<String> queue;
+ private static BlockingQueue<String> queue;
/**
* Messages from topic 2 are placed here by the endpoint.
*/
- private BlockingQueue<String> queue2;
+ private static BlockingQueue<String> queue2;
+
+ /**
+ * Topic configuration parameters.
+ */
+ private static TopicParameterGroup topicConfig;
+
+ /**
+ * The "host:port", extracted from <i>httpPrefix</i>.
+ */
+ private static String hostPort;
+
+ /**
+ * Unique consumer name used by a single test case.
+ */
+ private int consumerName;
/**
* Starts the rest server.
public static void setUpBeforeClass() throws Exception {
TopicEndpointManager.getManager().shutdown();
- CommonRestServer.setUpBeforeClass();
+ CommonRestServer.reconfigure();
- topicJson = new String(
- Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
- StandardCharsets.UTF_8);
- topicJson = topicJson.replace("${port}", String.valueOf(getPort()));
- }
+ startMain();
- /**
- * Starts the topics.
- *
- * @throws CoderException if the parameters cannot be decoded
- */
- @Before
- @Override
- public void setUp() throws CoderException {
queue = new LinkedBlockingQueue<>();
queue2 = new LinkedBlockingQueue<>();
- /*
- * change topic names for each test so any listeners that may still exist will not
- * grab new messages
- */
-
- ++ntests;
- topic = "my-topic-" + ntests;
- topic2 = "my-topic-b" + ntests;
-
- String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic);
+ String json = new String(
+ Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
+ StandardCharsets.UTF_8);
+ json = json.replace("${port}", String.valueOf(getPort()));
topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
TopicEndpointManager.getManager().addTopics(topicConfig);
TopicEndpointManager.getManager().start();
+
+ TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC)
+ .register((infra, topic, event) -> queue.add(event));
+ TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
+ .register((infra, topic, event) -> queue2.add(event));
+
+ hostPort = httpPrefix.substring(httpPrefix.indexOf("http://"), httpPrefix.length() - 1);
}
- @After
- public void tearDown() {
+ /**
+ * Stops the topics and clears the queues.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws DmaapSimException {
TopicEndpointManager.getManager().shutdown();
+
+ queue = null;
+ queue2 = null;
+
+ main.shutdown();
+ }
+
+ /**
+ * Starts the topics.
+ *
+ * @throws CoderException if the parameters cannot be decoded
+ */
+ @Before
+ public void setUp() {
+ ++consumerName;
+
+ queue.clear();
+ queue2.clear();
}
@Test
public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
- // register listeners to add events to appropriate queue
- TopicEndpointManager.getManager().getDmaapTopicSource(topic)
- .register((infra, topic, event) -> queue.add(event));
- TopicEndpointManager.getManager().getDmaapTopicSource(topic2)
- .register((infra, topic, event) -> queue2.add(event));
-
// publish events
- TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic);
- TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2);
+ TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
+ TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC2);
for (int x = 0; x < MAX_MSG; ++x) {
sink.send("hello-" + x);
sink2.send("world-" + x);
@Test
public void testCambriaFormat() throws Exception {
+ // @formatter:off
test("testCambriaFormat", "application/cambria",
(wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
+ // @formatter:on
}
@Test
*/
private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
throws Exception {
+
+ /*
+ * Force consumer name to be registered with the server by attempting to fetch a message.
+ */
+ buildConsumer(0).fetch();
+
String msg1 = "{'abc':10.0}".replace('\'', '"');
String msg2 = "{'def':20.0}".replace('\'', '"');
- TopicEndpointManager.getManager().getDmaapTopicSource(topic)
- .register((infra, topic, event) -> queue.add(event));
-
- TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
- URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
+ URL url = new URL(httpPrefix + "events/" + TOPIC3);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
- assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
- assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ // fetch the messages
+ Iterator<String> iter = buildConsumer(1000).fetch().iterator();
+
+ assertTrue(testName + " have message 1", iter.hasNext());
+ assertEquals(testName + " message 1", msg1, iter.next());
+
+ assertTrue(testName + " have message 2", iter.hasNext());
+ assertEquals(testName + " message 2", msg2, iter.next());
+
+ // no more messages
+ assertFalse(testName + " extra message", iter.hasNext());
+ }
+
+ private CambriaConsumer buildConsumer(int timeoutMs) throws MalformedURLException, GeneralSecurityException {
+ ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+
+ builder.knownAs(String.valueOf(consumerName), "my-consumer-id")
+ .usingHosts(hostPort).onTopic(TOPIC3)
+ .waitAtServer(timeoutMs).receivingAtMost(5);
+
+ builder.withSocketTimeout(timeoutMs + 2000);
+
+ return builder.build();
+ }
+
+ /**
+ * Starts the "Main".
+ *
+ * @throws Exception if an error occurs
+ */
+ private static void startMain() throws Exception {
+ Registry.newRegistry();
+
+ int port = CommonRestServer.getPort();
+
+ // make sure port is available
+ if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) {
+ throw new IllegalStateException("port " + port + " is still in use");
+ }
+
+ final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"};
+
+ main = new Main(simConfigParameters);
+
+ if (!NetworkUtil.isTcpPortOpen("localhost", port, 300, 200L)) {
+ throw new IllegalStateException("server is not listening on port " + port);
+ }
}
}