package org.onap.policy.sim.dmaap.e2e;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
import java.io.File;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.security.GeneralSecurityException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.network.NetworkUtil;
private static final int MAX_WAIT_SEC = 5;
private static final String TOPIC = "MY-TOPIC";
private static final String TOPIC2 = "MY-TOPIC-B";
+ private static final String TOPIC3 = "MY-TOPIC-C";
private static final int MAX_MSG = 200;
private static Main main;
*/
private static TopicParameterGroup topicConfig;
+ /**
+ * The "host:port", extracted from <i>httpPrefix</i>.
+ */
+ private static String hostPort;
+
+ /**
+ * Unique consumer name used by a single test case.
+ */
+ private int consumerName;
+
/**
* Starts the rest server.
*
.register((infra, topic, event) -> queue.add(event));
TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
.register((infra, topic, event) -> queue2.add(event));
+
+ hostPort = httpPrefix.substring(httpPrefix.indexOf("http://"), httpPrefix.length() - 1);
}
/**
*/
@Before
public void setUp() {
+ ++consumerName;
+
queue.clear();
queue2.clear();
}
*/
private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
throws Exception {
+
+ /*
+ * Force consumer name to be registered with the server by attempting to fetch a message.
+ */
+ buildConsumer(0).fetch();
+
String msg1 = "{'abc':10.0}".replace('\'', '"');
String msg2 = "{'def':20.0}".replace('\'', '"');
- TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
- URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
+ URL url = new URL(httpPrefix + "events/" + TOPIC3);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
- assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
- assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ // fetch the messages
+ Iterator<String> iter = buildConsumer(1000).fetch().iterator();
+
+ assertTrue(testName + " have message 1", iter.hasNext());
+ assertEquals(testName + " message 1", msg1, iter.next());
+
+ assertTrue(testName + " have message 2", iter.hasNext());
+ assertEquals(testName + " message 2", msg2, iter.next());
+
+ // no more messages
+ assertFalse(testName + " extra message", iter.hasNext());
+ }
+
+ private CambriaConsumer buildConsumer(int timeoutMs) throws MalformedURLException, GeneralSecurityException {
+ ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+
+ builder.knownAs(String.valueOf(consumerName), "my-consumer-id")
+ .usingHosts(hostPort).onTopic(TOPIC3)
+ .waitAtServer(timeoutMs).receivingAtMost(5);
+
+ builder.withSocketTimeout(timeoutMs + 2000);
+
+ return builder.build();
}
/**