159fc598ae25dd7b5e5ba38bb50ae4fb3595508b
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
41 import org.testcontainers.containers.DockerComposeContainer;
42 import org.testcontainers.junit.jupiter.Container;
43 import org.testcontainers.junit.jupiter.Testcontainers;
44 import reactor.core.publisher.Flux;
45 import reactor.core.publisher.Mono;
46 import reactor.test.StepVerifier;
47
48 import java.time.Duration;
49 import java.util.concurrent.TimeUnit;
50
51 import static org.mockserver.model.HttpRequest.request;
52 import static org.mockserver.model.HttpResponse.response;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
66
67 @Testcontainers
68 class MessageRouterPublisherIT {
69     @Container
70     private static final DockerComposeContainer CONTAINER = createContainerInstance();
71     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
72             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
73     private static String EVENTS_PATH;
74     private static String PROXY_MOCK_EVENTS_PATH;
75
76     private static final Duration TIMEOUT = Duration.ofSeconds(10);
77     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
78             + "{"
79             + "\"mrstatus\":5007,"
80             + "\"helpURL\":\"http://onap.readthedocs.io\","
81             + "\"message\":\"Error while publishing data to topic.:%s."
82             + "Successfully published number of messages :0."
83             + "Expected { to start an object.\",\"status\":400"
84             + "}";
85     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
86             + "{"
87             + "\"requestError\":"
88             + "{"
89             + "\"serviceException\":"
90             + "{"
91             + "\"messageId\":\"SVC0001\","
92             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
93             + "\"variables\":[\"408\"]"
94             + "}"
95             + "}"
96             + "}";
97
98     private final MessageRouterPublisher publisher = DmaapClientFactory
99             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
100     private final MessageRouterSubscriber subscriber = DmaapClientFactory
101             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
102
103     @BeforeAll
104     static void setUp() {
105         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
106         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
107     }
108
109     @BeforeEach
110     void set() {
111         MOCK_SERVER_CLIENT.reset();
112     }
113
114     @Test
115     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
116         //given
117         final String topic = "TOPIC";
118         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
119                 "{\"differentMessage\":\"message2\"}");
120         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
121         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
122         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
123
124         //when
125         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
126
127         //then
128         StepVerifier.create(result)
129                 .expectNext(expectedResponse)
130                 .expectComplete()
131                 .verify(TIMEOUT);
132     }
133
134     @Test
135     void publisher_shouldHandleBadRequestError() {
136         //given
137         final String topic = "TOPIC2";
138         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
139         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
140         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
141         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
142                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
143
144         //when
145         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
146
147         //then
148         StepVerifier.create(result)
149                 .expectNext(expectedResponse)
150                 .expectComplete()
151                 .verify(TIMEOUT);
152     }
153
154     @Test
155     void publisher_shouldSuccessfullyPublishSingleMessage() {
156         //given
157         final String topic = "TOPIC3";
158         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
159         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
160         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
161         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
162         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
163         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
164         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
165
166         //when
167         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
168         Mono<MessageRouterSubscribeResponse> response = publisher
169                 .put(publishRequest, jsonMessageBatch)
170                 .then(subscriber.get(subscribeRequest));
171
172         //then
173         StepVerifier.create(response)
174                 .expectNext(expectedResponse)
175                 .expectComplete()
176                 .verify();
177     }
178
179     @Test
180     void publisher_shouldSuccessfullyPublishMultipleMessages() {
181         final String topic = "TOPIC5";
182         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
183         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
184                 "{\"differentMessage\":\"message2\"}");
185         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
186         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
187         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
188         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
189         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
190
191         //when
192         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
193         Mono<MessageRouterSubscribeResponse> response = publisher
194                 .put(publishRequest, jsonMessageBatch)
195                 .then(subscriber.get(subscribeRequest));
196
197         //then
198         StepVerifier.create(response)
199                 .expectNext(expectedResponse)
200                 .expectComplete()
201                 .verify();
202     }
203
204     @Test
205     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
206         //given
207         final String topic = "TOPIC6";
208         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
209
210         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
211         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
212         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
213
214         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
215         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
216         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
217
218         //when
219         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
220         Mono<MessageRouterSubscribeResponse> response = publisher
221                 .put(publishRequest, plainBatch)
222                 .then(subscriber.get(subscribeRequest));
223
224         //then
225         StepVerifier.create(response)
226                 .expectNext(expectedResponse)
227                 .expectComplete()
228                 .verify();
229     }
230
231     @Test
232     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
233         //given
234         final String topic = "TOPIC7";
235         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
236
237         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
238         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
239         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
240
241         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
242         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
243         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
244
245         //when
246         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
247         Mono<MessageRouterSubscribeResponse> response = publisher
248                 .put(publishRequest, plainBatch)
249                 .then(subscriber.get(subscribeRequest));
250
251         //then
252         StepVerifier.create(response)
253                 .expectNext(expectedResponse)
254                 .expectComplete()
255                 .verify();
256     }
257
258     @Test
259     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
260         //given
261         final String topic = "TOPIC8";
262         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
263
264         final List<String> singlePlainMessage = List.of("kebab");
265         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
266         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
267
268         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
269         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
270         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
271
272         //when
273         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
274         Mono<MessageRouterSubscribeResponse> response = publisher
275                 .put(publishRequest, plainBatch)
276                 .then(subscriber.get(subscribeRequest));
277
278         //then
279         StepVerifier.create(response)
280                 .expectNext(expectedResponse)
281                 .expectComplete()
282                 .verify();
283     }
284
285     @Test
286     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
287         //given
288         final String topic = "TOPIC9";
289         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
290
291         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
292         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
293         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
294
295         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
296         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
297         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
298
299         //when
300         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
301         Mono<MessageRouterSubscribeResponse> response = publisher
302                 .put(publishRequest, plainBatch)
303                 .then(subscriber.get(subscribeRequest));
304
305         //then
306         StepVerifier.create(response)
307                 .expectNext(expectedResponse)
308                 .expectComplete()
309                 .verify();
310     }
311
312     @Test
313     void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
314         //given
315         final String topic = "TOPIC10";
316         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
317         final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
318         final MessageRouterPublishRequest mrRequest = createPublishRequest(
319                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
320         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
321         final String path = String.format("/events/%s", topic);
322         MOCK_SERVER_CLIENT
323                 .when(request().withPath(path), Times.once())
324                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
325
326         //when
327         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
328
329         //then
330         StepVerifier.create(result)
331                 .expectNext(expectedResponse)
332                 .expectComplete()
333                 .verify(TIMEOUT);
334
335         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
336     }
337
338     @Test
339     void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
340         final String topic = "TOPIC11";
341         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
342
343         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
344         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
345         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
346
347         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
348         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
349
350         final String path = String.format("/events/%s", topic);
351         MOCK_SERVER_CLIENT
352                 .when(request().withPath(path), Times.once())
353                 .respond(response().withStatusCode(404));
354
355         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
356                 retryConfig(1, 1));
357
358         //when
359         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
360
361         //then
362         StepVerifier.create(result)
363                 .expectNext(expectedResponse)
364                 .expectComplete()
365                 .verify();
366
367         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
368     }
369
370     @Test
371     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
372         final String topic = "TOPIC12";
373         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
374
375         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
376         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
377         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
378
379         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
380         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
381
382         final String path = String.format("/events/%s", topic);
383         MOCK_SERVER_CLIENT
384                 .when(request().withPath(path), Times.once())
385                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
386         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
387                 retryConfig(1, 1));
388
389         //when
390         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
391
392         //then
393         StepVerifier.create(result)
394                 .expectNext(expectedResponse)
395                 .expectComplete()
396                 .verify();
397
398         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
399     }
400
401     @Test
402     void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
403         final String topic = "TOPIC13";
404         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
405
406         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
407                 "{\"differentMessage\":\"message2\"}");
408         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
409         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
410
411         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
412         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
413
414         final String path = String.format("/events/%s", topic);
415         MOCK_SERVER_CLIENT
416                 .when(request().withPath(path), Times.once())
417                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
418         MOCK_SERVER_CLIENT
419                 .when(request().withPath(path), Times.once())
420                 .respond(response().withStatusCode(404));
421         MOCK_SERVER_CLIENT
422                 .when(request().withPath(path), Times.once())
423                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
424         MOCK_SERVER_CLIENT
425                 .when(request().withPath(path), Times.once())
426                 .respond(response().withStatusCode(500));
427         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
428
429         //when
430         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
431
432         //then
433         StepVerifier.create(result)
434                 .expectNext(expectedResponse)
435                 .expectComplete()
436                 .verify();
437
438         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
439     }
440
441     private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
442         return ImmutableMessageRouterPublisherConfig.builder()
443                 .retryConfig(ImmutableDmaapRetryConfig.builder()
444                         .retryIntervalInSeconds(retryInterval)
445                         .retryCount(retryCount)
446                         .build())
447                 .build();
448     }
449 }