Merge "Remove actor and recipe checks from ControlLoopCompiler.java"
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / sim / dmaap / e2e / EndToEndTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.sim.dmaap.e2e;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.File;
24 import java.io.PrintWriter;
25 import java.net.HttpURLConnection;
26 import java.net.URL;
27 import java.nio.charset.StandardCharsets;
28 import java.nio.file.Files;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.function.BiConsumer;
35 import org.junit.AfterClass;
36 import org.junit.Before;
37 import org.junit.BeforeClass;
38 import org.junit.Test;
39 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
40 import org.onap.policy.common.endpoints.event.comm.TopicSink;
41 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
42 import org.onap.policy.common.endpoints.parameters.TopicParameters;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
46
47 /**
48  * This tests the simulator using dmaap endpoints to verify that it works from publisher
49  * to subscriber.
50  */
51 public class EndToEndTest extends CommonRestServer {
52     private static final int MAX_WAIT_SEC = 5;
53     private static final String TOPIC = "MY-TOPIC";
54     private static final String TOPIC2 = "MY-TOPIC-B";
55     private static final int MAX_MSG = 200;
56
57     /**
58      * Messages from the topic are placed here by the endpoint.
59      */
60     private static BlockingQueue<String> queue;
61
62     /**
63      * Messages from topic 2 are placed here by the endpoint.
64      */
65     private static BlockingQueue<String> queue2;
66
67     /**
68      * Topic configuration parameters.
69      */
70     private static TopicParameterGroup topicConfig;
71
72     /**
73      * Starts the rest server.
74      *
75      * @throws Exception if an error occurs
76      */
77     @BeforeClass
78     public static void setUpBeforeClass() throws Exception {
79         TopicEndpointManager.getManager().shutdown();
80
81         CommonRestServer.setUpBeforeClass();
82
83         queue = new LinkedBlockingQueue<>();
84         queue2 = new LinkedBlockingQueue<>();
85
86         String json = new String(
87                         Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
88                         StandardCharsets.UTF_8);
89         json = json.replace("${port}", String.valueOf(getPort()));
90
91         topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
92
93         TopicEndpointManager.getManager().addTopics(topicConfig);
94         TopicEndpointManager.getManager().start();
95
96         TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC)
97                         .register((infra, topic, event) -> queue.add(event));
98         TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC2)
99                         .register((infra, topic, event) -> queue2.add(event));
100     }
101
102     /**
103      * Stops the topics and clears the queues.
104      */
105     @AfterClass
106     public static void tearDownAfterClass() {
107         TopicEndpointManager.getManager().shutdown();
108
109         queue = null;
110         queue2 = null;
111     }
112
113     /**
114      * Starts the topics.
115      *
116      * @throws CoderException if the parameters cannot be decoded
117      */
118     @Before
119     @Override
120     public void setUp() throws CoderException {
121         queue.clear();
122         queue2.clear();
123     }
124
125     @Test
126     public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
127         // publish events
128         TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
129         TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC2);
130         for (int x = 0; x < MAX_MSG; ++x) {
131             sink.send("hello-" + x);
132             sink2.send("world-" + x);
133         }
134
135         // verify events where received
136         for (int x = 0; x < MAX_MSG; ++x) {
137             assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
138             assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
139         }
140     }
141
142     @Test
143     public void testCambriaFormat() throws Exception {
144         // @formatter:off
145         test("testCambriaFormat", "application/cambria",
146             (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
147         // @formatter:on
148     }
149
150     @Test
151     public void testJson() throws Exception {
152         test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
153     }
154
155     @Test
156     public void testText() throws Exception {
157         test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
158     }
159
160     /**
161      * Uses a raw URL connection to ensure the server can process messages of the given
162      * media type.
163      *
164      * @param testName name of the test
165      * @param mediaType media type
166      * @param writeMessages function that writes messages to a PrintWriter
167      * @throws Exception if an error occurs
168      */
169     private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
170                     throws Exception {
171         String msg1 = "{'abc':10.0}".replace('\'', '"');
172         String msg2 = "{'def':20.0}".replace('\'', '"');
173
174         TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
175         URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
176
177         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
178         conn.setRequestMethod("POST");
179         conn.setRequestProperty("Content-type", mediaType);
180         conn.setDoOutput(true);
181         conn.connect();
182
183         try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
184             writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
185         }
186
187         assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
188
189         assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
190         assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
191     }
192 }