Remove annotations from dmaap common test class
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / sim / dmaap / e2e / EndToEndTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.sim.dmaap.e2e;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.File;
24 import java.io.PrintWriter;
25 import java.net.HttpURLConnection;
26 import java.net.URL;
27 import java.nio.charset.StandardCharsets;
28 import java.nio.file.Files;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.function.BiConsumer;
35 import org.junit.AfterClass;
36 import org.junit.Before;
37 import org.junit.BeforeClass;
38 import org.junit.Test;
39 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
40 import org.onap.policy.common.endpoints.event.comm.TopicSink;
41 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
42 import org.onap.policy.common.endpoints.parameters.TopicParameters;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.common.utils.network.NetworkUtil;
46 import org.onap.policy.common.utils.services.Registry;
47 import org.onap.policy.models.sim.dmaap.DmaapSimException;
48 import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
49 import org.onap.policy.models.sim.dmaap.startstop.Main;
50
51 /**
52  * This tests the simulator using dmaap endpoints to verify that it works from publisher
53  * to subscriber.
54  */
55 public class EndToEndTest extends CommonRestServer {
56     private static final int MAX_WAIT_SEC = 5;
57     private static final String TOPIC = "MY-TOPIC";
58     private static final String TOPIC2 = "MY-TOPIC-B";
59     private static final int MAX_MSG = 200;
60
61     private static Main main;
62
63     /**
64      * Messages from the topic are placed here by the endpoint.
65      */
66     private static BlockingQueue<String> queue;
67
68     /**
69      * Messages from topic 2 are placed here by the endpoint.
70      */
71     private static BlockingQueue<String> queue2;
72
73     /**
74      * Topic configuration parameters.
75      */
76     private static TopicParameterGroup topicConfig;
77
78     /**
79      * Starts the rest server.
80      *
81      * @throws Exception if an error occurs
82      */
83     @BeforeClass
84     public static void setUpBeforeClass() throws Exception {
85         TopicEndpointManager.getManager().shutdown();
86
87         CommonRestServer.reconfigure();
88
89         startMain();
90
91         queue = new LinkedBlockingQueue<>();
92         queue2 = new LinkedBlockingQueue<>();
93
94         String json = new String(
95                         Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
96                         StandardCharsets.UTF_8);
97         json = json.replace("${port}", String.valueOf(getPort()));
98
99         topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
100
101         TopicEndpointManager.getManager().addTopics(topicConfig);
102         TopicEndpointManager.getManager().start();
103
104         TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC)
105                         .register((infra, topic, event) -> queue.add(event));
106         TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
107                         .register((infra, topic, event) -> queue2.add(event));
108     }
109
110     /**
111      * Stops the topics and clears the queues.
112      */
113     @AfterClass
114     public static void tearDownAfterClass() throws DmaapSimException {
115         TopicEndpointManager.getManager().shutdown();
116
117         queue = null;
118         queue2 = null;
119
120         main.shutdown();
121     }
122
123     /**
124      * Starts the topics.
125      *
126      * @throws CoderException if the parameters cannot be decoded
127      */
128     @Before
129     public void setUp() {
130         queue.clear();
131         queue2.clear();
132     }
133
134     @Test
135     public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
136         // publish events
137         TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
138         TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC2);
139         for (int x = 0; x < MAX_MSG; ++x) {
140             sink.send("hello-" + x);
141             sink2.send("world-" + x);
142         }
143
144         // verify events where received
145         for (int x = 0; x < MAX_MSG; ++x) {
146             assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
147             assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
148         }
149     }
150
151     @Test
152     public void testCambriaFormat() throws Exception {
153         // @formatter:off
154         test("testCambriaFormat", "application/cambria",
155             (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
156         // @formatter:on
157     }
158
159     @Test
160     public void testJson() throws Exception {
161         test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
162     }
163
164     @Test
165     public void testText() throws Exception {
166         test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
167     }
168
169     /**
170      * Uses a raw URL connection to ensure the server can process messages of the given
171      * media type.
172      *
173      * @param testName name of the test
174      * @param mediaType media type
175      * @param writeMessages function that writes messages to a PrintWriter
176      * @throws Exception if an error occurs
177      */
178     private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
179                     throws Exception {
180         String msg1 = "{'abc':10.0}".replace('\'', '"');
181         String msg2 = "{'def':20.0}".replace('\'', '"');
182
183         TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
184         URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
185
186         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
187         conn.setRequestMethod("POST");
188         conn.setRequestProperty("Content-type", mediaType);
189         conn.setDoOutput(true);
190         conn.connect();
191
192         try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
193             writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
194         }
195
196         assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
197
198         assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
199         assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
200     }
201
202     /**
203      * Starts the "Main".
204      *
205      * @throws Exception if an error occurs
206      */
207     private static void startMain() throws Exception {
208         Registry.newRegistry();
209
210         int port = CommonRestServer.getPort();
211
212         // make sure port is available
213         if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) {
214             throw new IllegalStateException("port " + port + " is still in use");
215         }
216
217         final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"};
218
219         main = new Main(simConfigParameters);
220
221         if (!NetworkUtil.isTcpPortOpen("localhost", port, 300, 200L)) {
222             throw new IllegalStateException("server is not listening on port " + port);
223         }
224     }
225 }