Remove actor and recipe checks from ControlLoopCompiler.java
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / sim / dmaap / e2e / EndToEndTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.sim.dmaap.e2e;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.File;
24 import java.io.PrintWriter;
25 import java.net.HttpURLConnection;
26 import java.net.URL;
27 import java.nio.charset.StandardCharsets;
28 import java.nio.file.Files;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.function.BiConsumer;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.BeforeClass;
38 import org.junit.Test;
39 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
40 import org.onap.policy.common.endpoints.event.comm.TopicSink;
41 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
42 import org.onap.policy.common.endpoints.parameters.TopicParameters;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
46
47 /**
48  * This tests the simulator using dmaap endpoints to verify that it works from publisher
49  * to subscriber.
50  */
51 public class EndToEndTest extends CommonRestServer {
52     private static final int MAX_WAIT_SEC = 5;
53     private static final String ORIG_TOPIC = "MY-TOPIC";
54     private static final String ORIG_TOPIC2 = "MY-TOPIC-B";
55     private static final int MAX_MSG = 200;
56
57     private static int ntests = 0;
58     private static String topicJson;
59
60     private TopicParameterGroup topicConfig;
61
62     private String topic = "MY-TOPIC";
63     private String topic2 = "MY-TOPIC-B";
64
65     /**
66      * Messages from the topic are placed here by the endpoint.
67      */
68     private BlockingQueue<String> queue;
69
70     /**
71      * Messages from topic 2 are placed here by the endpoint.
72      */
73     private BlockingQueue<String> queue2;
74
75     /**
76      * Starts the rest server.
77      *
78      * @throws Exception if an error occurs
79      */
80     @BeforeClass
81     public static void setUpBeforeClass() throws Exception {
82         TopicEndpointManager.getManager().shutdown();
83
84         CommonRestServer.setUpBeforeClass();
85
86         topicJson = new String(
87                         Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
88                         StandardCharsets.UTF_8);
89         topicJson = topicJson.replace("${port}", String.valueOf(getPort()));
90     }
91
92     /**
93      * Starts the topics.
94      *
95      * @throws CoderException if the parameters cannot be decoded
96      */
97     @Before
98     @Override
99     public void setUp() throws CoderException {
100         queue = new LinkedBlockingQueue<>();
101         queue2 = new LinkedBlockingQueue<>();
102
103         /*
104          * change topic names for each test so any listeners that may still exist will not
105          * grab new messages
106          */
107
108         ++ntests;
109         topic = "my-topic-" + ntests;
110         topic2 = "my-topic-b" + ntests;
111
112         String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic);
113
114         topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
115
116         TopicEndpointManager.getManager().addTopics(topicConfig);
117         TopicEndpointManager.getManager().start();
118     }
119
120     @After
121     public void tearDown() {
122         TopicEndpointManager.getManager().shutdown();
123     }
124
125     @Test
126     public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
127         // register listeners to add events to appropriate queue
128         TopicEndpointManager.getManager().getDmaapTopicSource(topic)
129                         .register((infra, topic, event) -> queue.add(event));
130         TopicEndpointManager.getManager().getDmaapTopicSource(topic2)
131                         .register((infra, topic, event) -> queue2.add(event));
132
133         // publish events
134         TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic);
135         TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2);
136         for (int x = 0; x < MAX_MSG; ++x) {
137             sink.send("hello-" + x);
138             sink2.send("world-" + x);
139         }
140
141         // verify events where received
142         for (int x = 0; x < MAX_MSG; ++x) {
143             assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
144             assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
145         }
146     }
147
148     @Test
149     public void testCambriaFormat() throws Exception {
150         test("testCambriaFormat", "application/cambria",
151             (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
152     }
153
154     @Test
155     public void testJson() throws Exception {
156         test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
157     }
158
159     @Test
160     public void testText() throws Exception {
161         test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
162     }
163
164     /**
165      * Uses a raw URL connection to ensure the server can process messages of the given
166      * media type.
167      *
168      * @param testName name of the test
169      * @param mediaType media type
170      * @param writeMessages function that writes messages to a PrintWriter
171      * @throws Exception if an error occurs
172      */
173     private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
174                     throws Exception {
175         String msg1 = "{'abc':10.0}".replace('\'', '"');
176         String msg2 = "{'def':20.0}".replace('\'', '"');
177
178         TopicEndpointManager.getManager().getDmaapTopicSource(topic)
179                         .register((infra, topic, event) -> queue.add(event));
180
181         TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
182         URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
183
184         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
185         conn.setRequestMethod("POST");
186         conn.setRequestProperty("Content-type", mediaType);
187         conn.setDoOutput(true);
188         conn.connect();
189
190         try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
191             writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
192         }
193
194         assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
195
196         assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
197         assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
198     }
199 }