Support retry in DCAE-SDK DMaaP-Client
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / api / MessageRouterPublisherIT.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.TimeToLive;
31 import org.mockserver.matchers.Times;
32 import org.mockserver.verify.VerificationTimes;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
42 import org.testcontainers.containers.DockerComposeContainer;
43 import org.testcontainers.junit.jupiter.Container;
44 import org.testcontainers.junit.jupiter.Testcontainers;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
48
49 import java.time.Duration;
50 import java.util.concurrent.TimeUnit;
51
52 import static org.mockserver.model.HttpRequest.request;
53 import static org.mockserver.model.HttpResponse.response;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
66 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
67
68 @Testcontainers
69 class MessageRouterPublisherIT {
70     @Container
71     private static final DockerComposeContainer CONTAINER = createContainerInstance();
72     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
73             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
74     private static String EVENTS_PATH;
75     private static String PROXY_MOCK_EVENTS_PATH;
76
77     private static final Duration TIMEOUT = Duration.ofSeconds(10);
78     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
79             + "{"
80             + "\"mrstatus\":5007,"
81             + "\"helpURL\":\"http://onap.readthedocs.io\","
82             + "\"message\":\"Error while publishing data to topic.:%s."
83             + "Successfully published number of messages :0."
84             + "Expected { to start an object.\",\"status\":400"
85             + "}";
86     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
87             + "{"
88             + "\"requestError\":"
89             + "{"
90             + "\"serviceException\":"
91             + "{"
92             + "\"messageId\":\"SVC0001\","
93             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
94             + "\"variables\":[\"408\"]"
95             + "}"
96             + "}"
97             + "}";
98
99     private final MessageRouterPublisher publisher = DmaapClientFactory
100             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
101     private final MessageRouterSubscriber subscriber = DmaapClientFactory
102             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
103
104     @BeforeAll
105     static void setUp() {
106         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
107         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
108     }
109
110     @BeforeEach
111     void set() {
112         MOCK_SERVER_CLIENT.reset();
113     }
114
115     @Test
116     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
117         //given
118         final String topic = "TOPIC";
119         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
120                 "{\"differentMessage\":\"message2\"}");
121         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
122         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
123         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
124
125         //when
126         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
127
128         //then
129         StepVerifier.create(result)
130                 .expectNext(expectedResponse)
131                 .expectComplete()
132                 .verify(TIMEOUT);
133     }
134
135     @Test
136     void publisher_shouldHandleBadRequestError() {
137         //given
138         final String topic = "TOPIC2";
139         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
140         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
141         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
142         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
143                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
144
145         //when
146         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
147
148         //then
149         StepVerifier.create(result)
150                 .expectNext(expectedResponse)
151                 .expectComplete()
152                 .verify(TIMEOUT);
153     }
154
155     @Test
156     void publisher_shouldSuccessfullyPublishSingleMessage() {
157         //given
158         final String topic = "TOPIC3";
159         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
160         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
161         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
162         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
163         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
164         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
165         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
166
167         //when
168         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
169         Mono<MessageRouterSubscribeResponse> response = publisher
170                 .put(publishRequest, jsonMessageBatch)
171                 .then(subscriber.get(subscribeRequest));
172
173         //then
174         StepVerifier.create(response)
175                 .expectNext(expectedResponse)
176                 .expectComplete()
177                 .verify();
178     }
179
180     @Test
181     void publisher_shouldSuccessfullyPublishMultipleMessages() {
182         final String topic = "TOPIC5";
183         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
184         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
185                 "{\"differentMessage\":\"message2\"}");
186         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
187         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
188         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
190         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
191
192         //when
193         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
194         Mono<MessageRouterSubscribeResponse> response = publisher
195                 .put(publishRequest, jsonMessageBatch)
196                 .then(subscriber.get(subscribeRequest));
197
198         //then
199         StepVerifier.create(response)
200                 .expectNext(expectedResponse)
201                 .expectComplete()
202                 .verify();
203     }
204
205     @Test
206     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
207         //given
208         final String topic = "TOPIC6";
209         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
210
211         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
212         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
213         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
214
215         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
216         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
217         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
218
219         //when
220         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
221         Mono<MessageRouterSubscribeResponse> response = publisher
222                 .put(publishRequest, plainBatch)
223                 .then(subscriber.get(subscribeRequest));
224
225         //then
226         StepVerifier.create(response)
227                 .expectNext(expectedResponse)
228                 .expectComplete()
229                 .verify();
230     }
231
232     @Test
233     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
234         //given
235         final String topic = "TOPIC7";
236         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
237
238         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
239         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
240         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
241
242         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
243         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
244         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
245
246         //when
247         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
248         Mono<MessageRouterSubscribeResponse> response = publisher
249                 .put(publishRequest, plainBatch)
250                 .then(subscriber.get(subscribeRequest));
251
252         //then
253         StepVerifier.create(response)
254                 .expectNext(expectedResponse)
255                 .expectComplete()
256                 .verify();
257     }
258
259     @Test
260     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
261         //given
262         final String topic = "TOPIC8";
263         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
264
265         final List<String> singlePlainMessage = List.of("kebab");
266         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
267         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
268
269         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
270         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
271         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
272
273         //when
274         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
275         Mono<MessageRouterSubscribeResponse> response = publisher
276                 .put(publishRequest, plainBatch)
277                 .then(subscriber.get(subscribeRequest));
278
279         //then
280         StepVerifier.create(response)
281                 .expectNext(expectedResponse)
282                 .expectComplete()
283                 .verify();
284     }
285
286     @Test
287     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
288         //given
289         final String topic = "TOPIC9";
290         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
291
292         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
293         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
294         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
295
296         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
297         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
298         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
299
300         //when
301         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
302         Mono<MessageRouterSubscribeResponse> response = publisher
303                 .put(publishRequest, plainBatch)
304                 .then(subscriber.get(subscribeRequest));
305
306         //then
307         StepVerifier.create(response)
308                 .expectNext(expectedResponse)
309                 .expectComplete()
310                 .verify();
311     }
312
313     @Test
314     void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
315         //given
316         final String topic = "TOPIC10";
317         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
318         final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
319         final MessageRouterPublishRequest mrRequest = createPublishRequest(
320                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
321         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
322         final String path = String.format("/events/%s", topic);
323         MOCK_SERVER_CLIENT
324                 .when(request().withPath(path), Times.once())
325                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
326
327         //when
328         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
329
330         //then
331         StepVerifier.create(result)
332                 .expectNext(expectedResponse)
333                 .expectComplete()
334                 .verify(TIMEOUT);
335
336         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
337     }
338
339     @Test
340     void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
341         final String topic = "TOPIC11";
342         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
343
344         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
345         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
346         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
347
348         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
349         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
350
351         final String path = String.format("/events/%s", topic);
352         MOCK_SERVER_CLIENT
353                 .when(request().withPath(path), Times.once())
354                 .respond(response().withStatusCode(404));
355         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
356
357         //when
358         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
359
360         //then
361         StepVerifier.create(result)
362                 .expectNext(expectedResponse)
363                 .expectComplete()
364                 .verify();
365
366         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
367     }
368
369     @Test
370     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
371         final String topic = "TOPIC12";
372         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
373
374         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
375         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
376         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
377
378         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
379         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
380
381         final String path = String.format("/events/%s", topic);
382         MOCK_SERVER_CLIENT
383                 .when(request().withPath(path), Times.once())
384                 .respond(response().withDelay(TimeUnit.SECONDS, 10));
385         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
386
387         //when
388         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
389
390         //then
391         StepVerifier.create(result)
392                 .expectNext(expectedResponse)
393                 .expectComplete()
394                 .verify();
395
396         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
397     }
398
399     private MessageRouterPublisherConfig retryConfig() {
400         return ImmutableMessageRouterPublisherConfig.builder()
401                 .retryConfig(ImmutableDmaapRetryConfig.builder()
402                         .retryIntervalInSeconds(1)
403                         .retryCount(1)
404                         .build())
405                 .build();
406     }
407 }