Fix random failures in dmaap-sim
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / sim / dmaap / e2e / EndToEndTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.sim.dmaap.e2e;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.File;
29 import java.io.PrintWriter;
30 import java.net.HttpURLConnection;
31 import java.net.MalformedURLException;
32 import java.net.URL;
33 import java.nio.charset.StandardCharsets;
34 import java.nio.file.Files;
35 import java.security.GeneralSecurityException;
36 import java.util.Arrays;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.LinkedBlockingQueue;
41 import java.util.concurrent.TimeUnit;
42 import java.util.function.BiConsumer;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
48 import org.onap.policy.common.endpoints.event.comm.TopicSink;
49 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.common.utils.network.NetworkUtil;
53 import org.onap.policy.common.utils.services.Registry;
54 import org.onap.policy.models.sim.dmaap.DmaapSimException;
55 import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
56 import org.onap.policy.models.sim.dmaap.startstop.Main;
57
58 /**
59  * This tests the simulator using dmaap endpoints to verify that it works from publisher
60  * to subscriber.
61  */
62 public class EndToEndTest extends CommonRestServer {
63     private static final int MAX_WAIT_SEC = 5;
64     private static final String TOPIC = "MY-TOPIC";
65     private static final String TOPIC2 = "MY-TOPIC-B";
66     private static final String TOPIC3 = "MY-TOPIC-C";
67     private static final int MAX_MSG = 200;
68
69     private static Main main;
70
71     /**
72      * Messages from the topic are placed here by the endpoint.
73      */
74     private static BlockingQueue<String> queue;
75
76     /**
77      * Messages from topic 2 are placed here by the endpoint.
78      */
79     private static BlockingQueue<String> queue2;
80
81     /**
82      * Topic configuration parameters.
83      */
84     private static TopicParameterGroup topicConfig;
85
86     /**
87      * The "host:port", extracted from <i>httpPrefix</i>.
88      */
89     private static String hostPort;
90
91     /**
92      * Unique consumer name used by a single test case.
93      */
94     private int consumerName;
95
96     /**
97      * Starts the rest server.
98      *
99      * @throws Exception if an error occurs
100      */
101     @BeforeClass
102     public static void setUpBeforeClass() throws Exception {
103         TopicEndpointManager.getManager().shutdown();
104
105         CommonRestServer.reconfigure();
106
107         startMain();
108
109         queue = new LinkedBlockingQueue<>();
110         queue2 = new LinkedBlockingQueue<>();
111
112         String json = new String(
113                         Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
114                         StandardCharsets.UTF_8);
115         json = json.replace("${port}", String.valueOf(getPort()));
116
117         topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
118
119         TopicEndpointManager.getManager().addTopics(topicConfig);
120         TopicEndpointManager.getManager().start();
121
122         TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC)
123                         .register((infra, topic, event) -> queue.add(event));
124         TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
125                         .register((infra, topic, event) -> queue2.add(event));
126
127         hostPort = httpPrefix.substring(httpPrefix.indexOf("http://"), httpPrefix.length() - 1);
128     }
129
130     /**
131      * Stops the topics and clears the queues.
132      */
133     @AfterClass
134     public static void tearDownAfterClass() throws DmaapSimException {
135         TopicEndpointManager.getManager().shutdown();
136
137         queue = null;
138         queue2 = null;
139
140         main.shutdown();
141     }
142
143     /**
144      * Starts the topics.
145      *
146      * @throws CoderException if the parameters cannot be decoded
147      */
148     @Before
149     public void setUp() {
150         ++consumerName;
151
152         queue.clear();
153         queue2.clear();
154     }
155
156     @Test
157     public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
158         // publish events
159         TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
160         TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC2);
161         for (int x = 0; x < MAX_MSG; ++x) {
162             sink.send("hello-" + x);
163             sink2.send("world-" + x);
164         }
165
166         // verify events where received
167         for (int x = 0; x < MAX_MSG; ++x) {
168             assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
169             assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
170         }
171     }
172
173     @Test
174     public void testCambriaFormat() throws Exception {
175         // @formatter:off
176         test("testCambriaFormat", "application/cambria",
177             (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
178         // @formatter:on
179     }
180
181     @Test
182     public void testJson() throws Exception {
183         test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
184     }
185
186     @Test
187     public void testText() throws Exception {
188         test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
189     }
190
191     /**
192      * Uses a raw URL connection to ensure the server can process messages of the given
193      * media type.
194      *
195      * @param testName name of the test
196      * @param mediaType media type
197      * @param writeMessages function that writes messages to a PrintWriter
198      * @throws Exception if an error occurs
199      */
200     private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
201                     throws Exception {
202
203         /*
204          * Force consumer name to be registered with the server by attempting to fetch a message.
205          */
206         buildConsumer(0).fetch();
207
208         String msg1 = "{'abc':10.0}".replace('\'', '"');
209         String msg2 = "{'def':20.0}".replace('\'', '"');
210
211         URL url = new URL(httpPrefix + "events/" + TOPIC3);
212
213         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
214         conn.setRequestMethod("POST");
215         conn.setRequestProperty("Content-type", mediaType);
216         conn.setDoOutput(true);
217         conn.connect();
218
219         try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
220             writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
221         }
222
223         assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
224
225         // fetch the messages
226         Iterator<String> iter = buildConsumer(1000).fetch().iterator();
227
228         assertTrue(testName + " have message 1", iter.hasNext());
229         assertEquals(testName + " message 1", msg1, iter.next());
230
231         assertTrue(testName + " have message 2", iter.hasNext());
232         assertEquals(testName + " message 2", msg2, iter.next());
233
234         // no more messages
235         assertFalse(testName + " extra message", iter.hasNext());
236     }
237
238     private CambriaConsumer buildConsumer(int timeoutMs) throws MalformedURLException, GeneralSecurityException {
239         ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
240
241         builder.knownAs(String.valueOf(consumerName), "my-consumer-id")
242                 .usingHosts(hostPort).onTopic(TOPIC3)
243                 .waitAtServer(timeoutMs).receivingAtMost(5);
244
245         builder.withSocketTimeout(timeoutMs + 2000);
246
247         return builder.build();
248     }
249
250     /**
251      * Starts the "Main".
252      *
253      * @throws Exception if an error occurs
254      */
255     private static void startMain() throws Exception {
256         Registry.newRegistry();
257
258         int port = CommonRestServer.getPort();
259
260         // make sure port is available
261         if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) {
262             throw new IllegalStateException("port " + port + " is still in use");
263         }
264
265         final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"};
266
267         main = new Main(simConfigParameters);
268
269         if (!NetworkUtil.isTcpPortOpen("localhost", port, 300, 200L)) {
270             throw new IllegalStateException("server is not listening on port " + port);
271         }
272     }
273 }