8f4edab0200f4bbffad229f6ffb7c24ca7e722b1
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
40 import org.testcontainers.containers.DockerComposeContainer;
41 import org.testcontainers.junit.jupiter.Container;
42 import org.testcontainers.junit.jupiter.Testcontainers;
43 import reactor.core.publisher.Flux;
44 import reactor.core.publisher.Mono;
45 import reactor.test.StepVerifier;
46
47 import java.time.Duration;
48 import java.util.concurrent.TimeUnit;
49
50 import static org.mockserver.model.HttpRequest.request;
51 import static org.mockserver.model.HttpResponse.response;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
64
65 @Testcontainers
66 class MessageRouterSubscriberIT {
67     @Container
68     private static final DockerComposeContainer CONTAINER = createContainerInstance();
69     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
70             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
71     private static String EVENTS_PATH;
72     private static String PROXY_MOCK_EVENTS_PATH;
73
74     private static final Duration TIMEOUT = Duration.ofSeconds(10);
75     private static final String CONSUMER_GROUP = "group1";
76     private static final String CONSUMER_ID = "consumer200";
77     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
78             "{" +
79             "\"mrstatus\":3001," +
80             "\"helpURL\":\"http://onap.readthedocs.io\"," +
81             "\"message\":\"No such topic exists.-[%s]\"," +
82             "\"status\":404" +
83             "}";
84     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
85             + "{"
86             + "\"requestError\":"
87             + "{"
88             + "\"serviceException\":"
89             + "{"
90             + "\"messageId\":\"SVC0001\","
91             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
92             + "\"variables\":[\"408\"]"
93             + "}"
94             + "}"
95             + "}";
96
97     private MessageRouterPublisher publisher = DmaapClientFactory
98             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
99     private MessageRouterSubscriber subscriber = DmaapClientFactory
100             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
101
102     @BeforeAll
103     static void setUp() {
104         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
105         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
106     }
107
108     @BeforeEach
109     void set() {
110         MOCK_SERVER_CLIENT.reset();
111     }
112
113     @Test
114     void subscriber_shouldHandleNoSuchTopicException() {
115         //given
116         final String topic = "newTopic";
117         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
118                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
119         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
120                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
121
122         //when
123         Mono<MessageRouterSubscribeResponse> response = subscriber
124                 .get(mrSubscribeRequest);
125
126         //then
127         StepVerifier.create(response)
128                 .expectNext(expectedResponse)
129                 .expectComplete()
130                 .verify(TIMEOUT);
131     }
132
133     @Test
134     void subscriberShouldHandleSingleItemResponse() {
135         //given
136         final String topic = "TOPIC";
137         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
138         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
139         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
140
141         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
142         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
143         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
144         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
145
146         //when
147         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
148         Mono<MessageRouterSubscribeResponse> response = publisher
149                 .put(publishRequest, jsonMessageBatch)
150                 .then(subscriber.get(subscribeRequest));
151
152         //then
153         StepVerifier.create(response)
154                 .expectNext(expectedResponse)
155                 .expectComplete()
156                 .verify();
157     }
158
159     @Test
160     void subscriber_shouldHandleMultipleItemsResponse() {
161         //given
162         final String topic = "TOPIC2";
163         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
164         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
165         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
166
167         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
168                 "{\"differentMessage\":\"message2\"}");
169         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
170         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
171         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
172
173         //when
174         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
175         Mono<MessageRouterSubscribeResponse> response = publisher
176                 .put(publishRequest, jsonMessageBatch)
177                 .then(subscriber.get(subscribeRequest));
178
179         //then
180         StepVerifier.create(response)
181                 .expectNext(expectedResponse)
182                 .expectComplete()
183                 .verify();
184     }
185
186     @Test
187     void subscriber_shouldExtractItemsFromResponse() {
188         //given
189         final String topic = "TOPIC3";
190         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
191         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
192         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
193                 CONSUMER_GROUP, CONSUMER_ID);
194
195         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
196                 "{\"differentMessage\":\"message2\"}");
197         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
198
199         //when
200         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
201         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
202                 .thenMany(subscriber.getElements(subscribeRequest));
203
204         //then
205         StepVerifier.create(result)
206                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
207                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
208                 .expectComplete()
209                 .verify(TIMEOUT);
210     }
211
212     @Test
213     void subscriber_shouldSubscribeToTopic() {
214         //given
215         final String topic = "TOPIC4";
216         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
217         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
218         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
219                 CONSUMER_GROUP, CONSUMER_ID);
220
221         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
222                 "{\"differentMessage\":\"message2\"}");
223         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
224         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
225
226         //when
227         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
228         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
229                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
230
231         //then
232         StepVerifier.create(result.take(2))
233                 .expectNext(messages.get(0))
234                 .expectNext(messages.get(1))
235                 .expectComplete()
236                 .verify(TIMEOUT);
237     }
238
239     @Test
240     void subscriber_shouldHandleTimeoutException() {
241         //given
242         final String topic = "TOPIC5";
243         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
244                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
245         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
246         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
247         MOCK_SERVER_CLIENT
248                 .when(request().withPath(path), Times.once())
249                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
250
251         //when
252         Mono<MessageRouterSubscribeResponse> response = subscriber
253                 .get(mrSubscribeRequest);
254
255         //then
256         StepVerifier.create(response)
257                 .expectNext(expectedResponse)
258                 .expectComplete()
259                 .verify(TIMEOUT);
260
261         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
262     }
263
264     @Test
265     void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
266         //given
267         final String topic = "TOPIC6";
268         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
269         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
270         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
271         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
272
273         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
274         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
275         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
276         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
277
278         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
279         MOCK_SERVER_CLIENT
280                 .when(request().withPath(path), Times.once())
281                 .respond(response().withStatusCode(404));
282         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
283                 retryConfig(1, 1));
284
285         //when
286         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
287                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
288         Mono<MessageRouterSubscribeResponse> response = publisher
289                 .put(publishRequest, jsonMessageBatch)
290                 .then(subscriber.get(subscribeRequest));
291
292         //then
293         StepVerifier.create(response)
294                 .expectNext(expectedResponse)
295                 .expectComplete()
296                 .verify();
297
298         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
299     }
300
301     @Test
302     void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
303         //given
304         final String topic = "TOPIC7";
305         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
306         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
307         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
308         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
309                 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
310
311         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
312         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
313         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
314         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
315
316         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
317         MOCK_SERVER_CLIENT
318                 .when(request().withPath(path), Times.once())
319                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
320         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
321                 retryConfig(1, 1));
322
323         //when
324         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
325                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
326         Mono<MessageRouterSubscribeResponse> response = publisher
327                 .put(publishRequest, jsonMessageBatch)
328                 .then(subscriber.get(subscribeRequest));
329
330         //then
331         StepVerifier.create(response)
332                 .expectNext(expectedResponse)
333                 .expectComplete()
334                 .verify();
335
336         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
337     }
338
339     @Test
340     void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
341         //given
342         final String topic = "TOPIC8";
343         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
344         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
345         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
346         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
347                 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
348
349         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
350         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
351         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
352         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
353
354         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
355         MOCK_SERVER_CLIENT
356                 .when(request().withPath(path), Times.once())
357                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
358         MOCK_SERVER_CLIENT
359                 .when(request().withPath(path), Times.once())
360                 .respond(response().withStatusCode(404));
361         MOCK_SERVER_CLIENT
362                 .when(request().withPath(path), Times.once())
363                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
364         MOCK_SERVER_CLIENT
365                 .when(request().withPath(path), Times.once())
366                 .respond(response().withStatusCode(500));
367         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
368                 retryConfig(1, 5));
369
370         //when
371         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
372                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
373         Mono<MessageRouterSubscribeResponse> response = publisher
374                 .put(publishRequest, jsonMessageBatch)
375                 .then(subscriber.get(subscribeRequest));
376
377         //then
378         StepVerifier.create(response)
379                 .expectNext(expectedResponse)
380                 .expectComplete()
381                 .verify();
382
383         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
384     }
385
386     @Test
387     void subscriber_shouldHandleLastRetryError500() {
388         //given
389         final String topic = "TOPIC9";
390         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
391         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
392                 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
393         final String responseMessage = "Response Message";
394         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
395                 "500 Internal Server Error\n%s", responseMessage);
396
397         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
398         MOCK_SERVER_CLIENT
399                 .when(request().withPath(path), Times.once())
400                 .respond(response().withStatusCode(404));
401         MOCK_SERVER_CLIENT
402                 .when(request().withPath(path), Times.once())
403                 .respond(response().withStatusCode(500).withBody(responseMessage));
404         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
405                 retryConfig(1, 1));
406
407         //when
408         Mono<MessageRouterSubscribeResponse> response = subscriber.get(subscribeRequest);
409
410         //then
411         StepVerifier.create(response)
412                 .expectNext(expectedResponse)
413                 .expectComplete()
414                 .verify();
415
416         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
417     }
418     @Test
419     void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() {
420         //given
421         final String topic = "TOPIC4";
422         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
423         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
424         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
425                 CONSUMER_GROUP, CONSUMER_ID);
426
427         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
428                 "{\"differentMessage\":\"message2\"}");
429         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
430         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
431
432         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(connectionPoolConfiguration());
433
434         //when
435         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
436         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
437                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
438
439         //then
440         StepVerifier.create(result.take(2))
441                 .expectNext(messages.get(0))
442                 .expectNext(messages.get(1))
443                 .expectComplete()
444                 .verify(TIMEOUT);
445     }
446
447     private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
448         return ImmutableMessageRouterSubscriberConfig.builder()
449                 .retryConfig(ImmutableDmaapRetryConfig.builder()
450                         .retryIntervalInSeconds(retryInterval)
451                         .retryCount(retryCount)
452                         .build())
453                 .build();
454     }
455
456     private MessageRouterSubscriberConfig connectionPoolConfiguration() {
457         return ImmutableMessageRouterSubscriberConfig.builder()
458                 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
459                         .connectionPool(10)
460                         .maxIdleTime(10)
461                         .maxLifeTime(20)
462                         .build())
463                 .build();
464     }
465 }