Add timeout for Publisher(dmaap-client)
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / api / MessageRouterPublisherIT.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2020 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import eu.rekawek.toxiproxy.Proxy;
26 import eu.rekawek.toxiproxy.ToxiproxyClient;
27 import io.vavr.collection.List;
28 import org.junit.jupiter.api.BeforeAll;
29 import org.junit.jupiter.api.Test;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
37 import org.testcontainers.containers.DockerComposeContainer;
38 import org.testcontainers.junit.jupiter.Container;
39 import org.testcontainers.junit.jupiter.Testcontainers;
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42 import reactor.test.StepVerifier;
43
44 import java.io.IOException;
45 import java.time.Duration;
46 import java.util.concurrent.TimeUnit;
47
48 import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
50 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
62
63 @Testcontainers
64 class MessageRouterPublisherIT {
65     @Container
66     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
67     private static final Duration TIMEOUT = Duration.ofSeconds(10);
68     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
69             + "{"
70             + "\"mrstatus\":5007,"
71             + "\"helpURL\":\"http://onap.readthedocs.io\","
72             + "\"message\":\"Error while publishing data to topic.:%s."
73             + "Successfully published number of messages :0."
74             + "Expected { to start an object.\",\"status\":400"
75             + "}";
76     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
77             + "{"
78             + "\"requestError\":"
79             + "{"
80             + "\"serviceException\":"
81             + "{"
82             + "\"messageId\":\"SVC0001\","
83             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
84             + "\"variables\":[\"408\"]"
85             + "}"
86             + "}"
87             + "}";
88     private static Proxy DMAAP_PROXY;
89     private static String EVENTS_PATH;
90     private static String PROXY_EVENTS_PATH;
91     private final MessageRouterPublisher publisher = DmaapClientFactory
92             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
93     private final MessageRouterSubscriber subscriber = DmaapClientFactory
94             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
95
96     @BeforeAll
97     static void setUp() throws IOException {
98         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
99         PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
100
101         DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
102                 String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
103                 String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
104     }
105
106     @Test
107     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
108         //given
109         final String topic = "TOPIC";
110         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
111                 "{\"differentMessage\":\"message2\"}");
112         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
113         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
114         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
115
116         //when
117         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
118
119         //then
120         StepVerifier.create(result)
121                 .expectNext(expectedResponse)
122                 .expectComplete()
123                 .verify(TIMEOUT);
124     }
125
126     @Test
127     void publisher_shouldHandleBadRequestError() {
128         //given
129         final String topic = "TOPIC2";
130         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
131         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
132         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
133         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
134                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
135
136         //when
137         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
138
139         //then
140         StepVerifier.create(result)
141                 .expectNext(expectedResponse)
142                 .expectComplete()
143                 .verify(TIMEOUT);
144     }
145
146     @Test
147     void publisher_shouldSuccessfullyPublishSingleMessage() {
148         //given
149         final String topic = "TOPIC3";
150         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
151         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
152         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
153         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
154         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
155         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
156         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
157
158         //when
159         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
160         Mono<MessageRouterSubscribeResponse> response = publisher
161                 .put(publishRequest, jsonMessageBatch)
162                 .then(subscriber.get(subscribeRequest));
163
164         //then
165         StepVerifier.create(response)
166                 .expectNext(expectedResponse)
167                 .expectComplete()
168                 .verify();
169     }
170
171     @Test
172     void publisher_shouldSuccessfullyPublishMultipleMessages() {
173         final String topic = "TOPIC5";
174         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
175         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
176                 "{\"differentMessage\":\"message2\"}");
177         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
178         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
179         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
180         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
181         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
182
183         //when
184         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
185         Mono<MessageRouterSubscribeResponse> response = publisher
186                 .put(publishRequest, jsonMessageBatch)
187                 .then(subscriber.get(subscribeRequest));
188
189         //then
190         StepVerifier.create(response)
191                 .expectNext(expectedResponse)
192                 .expectComplete()
193                 .verify();
194     }
195
196     @Test
197     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
198         //given
199         final String topic = "TOPIC6";
200         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
201
202         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
203         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
204         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
205
206         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
207         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
208         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
209
210         //when
211         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
212         Mono<MessageRouterSubscribeResponse> response = publisher
213                 .put(publishRequest, plainBatch)
214                 .then(subscriber.get(subscribeRequest));
215
216         //then
217         StepVerifier.create(response)
218                 .expectNext(expectedResponse)
219                 .expectComplete()
220                 .verify();
221     }
222
223     @Test
224     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
225         //given
226         final String topic = "TOPIC7";
227         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
228
229         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
230         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
231         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
232
233         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
234         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
235         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
236
237         //when
238         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
239         Mono<MessageRouterSubscribeResponse> response = publisher
240                 .put(publishRequest, plainBatch)
241                 .then(subscriber.get(subscribeRequest));
242
243         //then
244         StepVerifier.create(response)
245                 .expectNext(expectedResponse)
246                 .expectComplete()
247                 .verify();
248     }
249
250     @Test
251     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
252         //given
253         final String topic = "TOPIC8";
254         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
255
256         final List<String> singlePlainMessage = List.of("kebab");
257         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
258         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
259
260         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
261         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
262         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
263
264         //when
265         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
266         Mono<MessageRouterSubscribeResponse> response = publisher
267                 .put(publishRequest, plainBatch)
268                 .then(subscriber.get(subscribeRequest));
269
270         //then
271         StepVerifier.create(response)
272                 .expectNext(expectedResponse)
273                 .expectComplete()
274                 .verify();
275     }
276
277     @Test
278     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
279         //given
280         final String topic = "TOPIC9";
281         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
282
283         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
284         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
285         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
286
287         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
288         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
289         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
290
291         //when
292         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
293         Mono<MessageRouterSubscribeResponse> response = publisher
294                 .put(publishRequest, plainBatch)
295                 .then(subscriber.get(subscribeRequest));
296
297         //then
298         StepVerifier.create(response)
299                 .expectNext(expectedResponse)
300                 .expectComplete()
301                 .verify();
302     }
303
304     @Test
305     void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException {
306         //given
307         final String toxic = "latency-toxic";
308         DMAAP_PROXY.toxics()
309                 .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
310         final String topic = "TOPIC10";
311         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
312         final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
313         final MessageRouterPublishRequest mrRequest = createPublishRequest(
314                 String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1));
315         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
316
317         //when
318         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
319
320         //then
321         StepVerifier.create(result)
322                 .expectNext(expectedResponse)
323                 .expectComplete()
324                 .verify(TIMEOUT);
325
326         //cleanup
327         DMAAP_PROXY.toxics().get(toxic).remove();
328     }
329 }