2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.sim.dmaap.e2e;
21 import static org.junit.Assert.assertEquals;
24 import java.io.PrintWriter;
25 import java.net.HttpURLConnection;
27 import java.nio.charset.StandardCharsets;
28 import java.nio.file.Files;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.function.BiConsumer;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.BeforeClass;
38 import org.junit.Test;
39 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
40 import org.onap.policy.common.endpoints.event.comm.TopicSink;
41 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
42 import org.onap.policy.common.endpoints.parameters.TopicParameters;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
48 * This tests the simulator using dmaap endpoints to verify that it works from publisher
51 public class EndToEndTest extends CommonRestServer {
52 private static final int MAX_WAIT_SEC = 5;
53 private static final String ORIG_TOPIC = "MY-TOPIC";
54 private static final String ORIG_TOPIC2 = "MY-TOPIC-B";
55 private static final int MAX_MSG = 200;
57 private static int ntests = 0;
58 private static String topicJson;
60 private TopicParameterGroup topicConfig;
62 private String topic = "MY-TOPIC";
63 private String topic2 = "MY-TOPIC-B";
66 * Messages from the topic are placed here by the endpoint.
68 private BlockingQueue<String> queue;
71 * Messages from topic 2 are placed here by the endpoint.
73 private BlockingQueue<String> queue2;
76 * Starts the rest server.
78 * @throws Exception if an error occurs
81 public static void setUpBeforeClass() throws Exception {
82 TopicEndpointManager.getManager().shutdown();
84 CommonRestServer.setUpBeforeClass();
86 topicJson = new String(
87 Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
88 StandardCharsets.UTF_8);
89 topicJson = topicJson.replace("${port}", String.valueOf(getPort()));
95 * @throws CoderException if the parameters cannot be decoded
99 public void setUp() throws CoderException {
100 queue = new LinkedBlockingQueue<>();
101 queue2 = new LinkedBlockingQueue<>();
104 * change topic names for each test so any listeners that may still exist will not
109 topic = "my-topic-" + ntests;
110 topic2 = "my-topic-b" + ntests;
112 String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic);
114 topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
116 TopicEndpointManager.getManager().addTopics(topicConfig);
117 TopicEndpointManager.getManager().start();
121 public void tearDown() {
122 TopicEndpointManager.getManager().shutdown();
126 public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
127 // register listeners to add events to appropriate queue
128 TopicEndpointManager.getManager().getDmaapTopicSource(topic)
129 .register((infra, topic, event) -> queue.add(event));
130 TopicEndpointManager.getManager().getDmaapTopicSource(topic2)
131 .register((infra, topic, event) -> queue2.add(event));
134 TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic);
135 TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2);
136 for (int x = 0; x < MAX_MSG; ++x) {
137 sink.send("hello-" + x);
138 sink2.send("world-" + x);
141 // verify events where received
142 for (int x = 0; x < MAX_MSG; ++x) {
143 assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
144 assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
149 public void testCambriaFormat() throws Exception {
150 test("testCambriaFormat", "application/cambria",
151 (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
155 public void testJson() throws Exception {
156 test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
160 public void testText() throws Exception {
161 test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
165 * Uses a raw URL connection to ensure the server can process messages of the given
168 * @param testName name of the test
169 * @param mediaType media type
170 * @param writeMessages function that writes messages to a PrintWriter
171 * @throws Exception if an error occurs
173 private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
175 String msg1 = "{'abc':10.0}".replace('\'', '"');
176 String msg2 = "{'def':20.0}".replace('\'', '"');
178 TopicEndpointManager.getManager().getDmaapTopicSource(topic)
179 .register((infra, topic, event) -> queue.add(event));
181 TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
182 URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
184 HttpURLConnection conn = (HttpURLConnection) url.openConnection();
185 conn.setRequestMethod("POST");
186 conn.setRequestProperty("Content-type", mediaType);
187 conn.setDoOutput(true);
190 try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
191 writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
194 assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
196 assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
197 assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));