2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.sim.dmaap.e2e;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.io.PrintWriter;
30 import java.net.HttpURLConnection;
31 import java.net.MalformedURLException;
33 import java.nio.charset.StandardCharsets;
34 import java.nio.file.Files;
35 import java.security.GeneralSecurityException;
36 import java.util.Arrays;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.LinkedBlockingQueue;
41 import java.util.concurrent.TimeUnit;
42 import java.util.function.BiConsumer;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
48 import org.onap.policy.common.endpoints.event.comm.TopicSink;
49 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.common.utils.network.NetworkUtil;
53 import org.onap.policy.common.utils.services.Registry;
54 import org.onap.policy.models.sim.dmaap.DmaapSimException;
55 import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
56 import org.onap.policy.models.sim.dmaap.startstop.Main;
59 * This tests the simulator using dmaap endpoints to verify that it works from publisher
62 public class EndToEndTest extends CommonRestServer {
63 private static final int MAX_WAIT_SEC = 5;
64 private static final String TOPIC = "MY-TOPIC";
65 private static final String TOPIC2 = "MY-TOPIC-B";
66 private static final String TOPIC3 = "MY-TOPIC-C";
67 private static final int MAX_MSG = 200;
69 private static Main main;
72 * Messages from the topic are placed here by the endpoint.
74 private static BlockingQueue<String> queue;
77 * Messages from topic 2 are placed here by the endpoint.
79 private static BlockingQueue<String> queue2;
82 * Topic configuration parameters.
84 private static TopicParameterGroup topicConfig;
87 * The "host:port", extracted from <i>httpPrefix</i>.
89 private static String hostPort;
92 * Unique consumer name used by a single test case.
94 private int consumerName;
97 * Starts the rest server.
99 * @throws Exception if an error occurs
102 public static void setUpBeforeClass() throws Exception {
103 TopicEndpointManager.getManager().shutdown();
105 CommonRestServer.reconfigure();
109 queue = new LinkedBlockingQueue<>();
110 queue2 = new LinkedBlockingQueue<>();
112 String json = new String(
113 Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
114 StandardCharsets.UTF_8);
115 json = json.replace("${port}", String.valueOf(getPort()));
117 topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
119 TopicEndpointManager.getManager().addTopics(topicConfig);
120 TopicEndpointManager.getManager().start();
122 TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC)
123 .register((infra, topic, event) -> queue.add(event));
124 TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
125 .register((infra, topic, event) -> queue2.add(event));
127 hostPort = httpPrefix.substring(httpPrefix.indexOf("http://"), httpPrefix.length() - 1);
131 * Stops the topics and clears the queues.
134 public static void tearDownAfterClass() throws DmaapSimException {
135 TopicEndpointManager.getManager().shutdown();
146 * @throws CoderException if the parameters cannot be decoded
149 public void setUp() {
157 public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
159 TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
160 TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC2);
161 for (int x = 0; x < MAX_MSG; ++x) {
162 sink.send("hello-" + x);
163 sink2.send("world-" + x);
166 // verify events where received
167 for (int x = 0; x < MAX_MSG; ++x) {
168 assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
169 assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
174 public void testCambriaFormat() throws Exception {
176 test("testCambriaFormat", "application/cambria",
177 (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
182 public void testJson() throws Exception {
183 test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
187 public void testText() throws Exception {
188 test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
192 * Uses a raw URL connection to ensure the server can process messages of the given
195 * @param testName name of the test
196 * @param mediaType media type
197 * @param writeMessages function that writes messages to a PrintWriter
198 * @throws Exception if an error occurs
200 private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
204 * Force consumer name to be registered with the server by attempting to fetch a message.
206 buildConsumer(0).fetch();
208 String msg1 = "{'abc':10.0}".replace('\'', '"');
209 String msg2 = "{'def':20.0}".replace('\'', '"');
211 URL url = new URL(httpPrefix + "events/" + TOPIC3);
213 HttpURLConnection conn = (HttpURLConnection) url.openConnection();
214 conn.setRequestMethod("POST");
215 conn.setRequestProperty("Content-type", mediaType);
216 conn.setDoOutput(true);
219 try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
220 writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
223 assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
225 // fetch the messages
226 Iterator<String> iter = buildConsumer(1000).fetch().iterator();
228 assertTrue(testName + " have message 1", iter.hasNext());
229 assertEquals(testName + " message 1", msg1, iter.next());
231 assertTrue(testName + " have message 2", iter.hasNext());
232 assertEquals(testName + " message 2", msg2, iter.next());
235 assertFalse(testName + " extra message", iter.hasNext());
238 private CambriaConsumer buildConsumer(int timeoutMs) throws MalformedURLException, GeneralSecurityException {
239 ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
241 builder.knownAs(String.valueOf(consumerName), "my-consumer-id")
242 .usingHosts(hostPort).onTopic(TOPIC3)
243 .waitAtServer(timeoutMs).receivingAtMost(5);
245 builder.withSocketTimeout(timeoutMs + 2000);
247 return builder.build();
253 * @throws Exception if an error occurs
255 private static void startMain() throws Exception {
256 Registry.newRegistry();
258 int port = CommonRestServer.getPort();
260 // make sure port is available
261 if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) {
262 throw new IllegalStateException("port " + port + " is still in use");
265 final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"};
267 main = new Main(simConfigParameters);
269 if (!NetworkUtil.isTcpPortOpen("localhost", port, 300, 200L)) {
270 throw new IllegalStateException("server is not listening on port " + port);