Support retry in DCAE-SDK DMaaP-Client
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / api / MessageRouterSubscriberIT.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
39 import org.testcontainers.containers.DockerComposeContainer;
40 import org.testcontainers.junit.jupiter.Container;
41 import org.testcontainers.junit.jupiter.Testcontainers;
42 import reactor.core.publisher.Flux;
43 import reactor.core.publisher.Mono;
44 import reactor.test.StepVerifier;
45
46 import java.time.Duration;
47 import java.util.concurrent.TimeUnit;
48
49 import static org.mockserver.model.HttpRequest.request;
50 import static org.mockserver.model.HttpResponse.response;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
63
64 @Testcontainers
65 class MessageRouterSubscriberIT {
66     @Container
67     private static final DockerComposeContainer CONTAINER = createContainerInstance();
68     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
69             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
70     private static String EVENTS_PATH;
71     private static String PROXY_MOCK_EVENTS_PATH;
72
73     private static final Duration TIMEOUT = Duration.ofSeconds(10);
74     private static final String CONSUMER_GROUP = "group1";
75     private static final String CONSUMER_ID = "consumer200";
76     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
77             "{" +
78             "\"mrstatus\":3001," +
79             "\"helpURL\":\"http://onap.readthedocs.io\"," +
80             "\"message\":\"No such topic exists.-[%s]\"," +
81             "\"status\":404" +
82             "}";
83     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
84             + "{"
85             + "\"requestError\":"
86             + "{"
87             + "\"serviceException\":"
88             + "{"
89             + "\"messageId\":\"SVC0001\","
90             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
91             + "\"variables\":[\"408\"]"
92             + "}"
93             + "}"
94             + "}";
95
96     private MessageRouterPublisher publisher = DmaapClientFactory
97             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
98     private MessageRouterSubscriber subscriber = DmaapClientFactory
99             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
100
101     @BeforeAll
102     static void setUp() {
103         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
104         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
105     }
106
107     @BeforeEach
108     void set() {
109         MOCK_SERVER_CLIENT.reset();
110     }
111
112     @Test
113     void subscriber_shouldHandleNoSuchTopicException() {
114         //given
115         final String topic = "newTopic";
116         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
117                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
118         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
119                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
120
121         //when
122         Mono<MessageRouterSubscribeResponse> response = subscriber
123                 .get(mrSubscribeRequest);
124
125         //then
126         StepVerifier.create(response)
127                 .expectNext(expectedResponse)
128                 .expectComplete()
129                 .verify(TIMEOUT);
130     }
131
132     @Test
133     void subscriberShouldHandleSingleItemResponse() {
134         //given
135         final String topic = "TOPIC";
136         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
137         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
138         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
139
140         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
141         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
142         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
143         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
144
145         //when
146         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
147         Mono<MessageRouterSubscribeResponse> response = publisher
148                 .put(publishRequest, jsonMessageBatch)
149                 .then(subscriber.get(subscribeRequest));
150
151         //then
152         StepVerifier.create(response)
153                 .expectNext(expectedResponse)
154                 .expectComplete()
155                 .verify();
156     }
157
158     @Test
159     void subscriber_shouldHandleMultipleItemsResponse() {
160         //given
161         final String topic = "TOPIC2";
162         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
163         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
164         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
165
166         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
167                 "{\"differentMessage\":\"message2\"}");
168         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
169         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
170         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
171
172         //when
173         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
174         Mono<MessageRouterSubscribeResponse> response = publisher
175                 .put(publishRequest, jsonMessageBatch)
176                 .then(subscriber.get(subscribeRequest));
177
178         //then
179         StepVerifier.create(response)
180                 .expectNext(expectedResponse)
181                 .expectComplete()
182                 .verify();
183     }
184
185     @Test
186     void subscriber_shouldExtractItemsFromResponse() {
187         //given
188         final String topic = "TOPIC3";
189         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
190         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
191         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
192                 CONSUMER_GROUP, CONSUMER_ID);
193
194         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
195                 "{\"differentMessage\":\"message2\"}");
196         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
197
198         //when
199         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
200         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
201                 .thenMany(subscriber.getElements(subscribeRequest));
202
203         //then
204         StepVerifier.create(result)
205                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
206                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
207                 .expectComplete()
208                 .verify(TIMEOUT);
209     }
210
211     @Test
212     void subscriber_shouldSubscribeToTopic() {
213         //given
214         final String topic = "TOPIC4";
215         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
216         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
217         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
218                 CONSUMER_GROUP, CONSUMER_ID);
219
220         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
221                 "{\"differentMessage\":\"message2\"}");
222         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
223         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
224
225         //when
226         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
227         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
228                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
229
230         //then
231         StepVerifier.create(result.take(2))
232                 .expectNext(messages.get(0))
233                 .expectNext(messages.get(1))
234                 .expectComplete()
235                 .verify(TIMEOUT);
236     }
237
238     @Test
239     void subscriber_shouldHandleTimeoutException() {
240         //given
241         final String topic = "TOPIC5";
242         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
243                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
244         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
245         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
246         MOCK_SERVER_CLIENT
247                 .when(request().withPath(path), Times.once())
248                 .respond(response().withDelay(TimeUnit.SECONDS, 5));
249
250         //when
251         Mono<MessageRouterSubscribeResponse> response = subscriber
252                 .get(mrSubscribeRequest);
253
254         //then
255         StepVerifier.create(response)
256                 .expectNext(expectedResponse)
257                 .expectComplete()
258                 .verify(TIMEOUT);
259
260         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
261     }
262
263     @Test
264     void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
265         //given
266         final String topic = "TOPIC6";
267         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
268         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
269         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
270         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
271
272         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
273         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
274         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
275         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
276
277         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
278         MOCK_SERVER_CLIENT
279                 .when(request().withPath(path), Times.once())
280                 .respond(response().withStatusCode(404));
281         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
282
283         //when
284         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
285                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
286         Mono<MessageRouterSubscribeResponse> response = publisher
287                 .put(publishRequest, jsonMessageBatch)
288                 .then(subscriber.get(subscribeRequest));
289
290         //then
291         StepVerifier.create(response)
292                 .expectNext(expectedResponse)
293                 .expectComplete()
294                 .verify();
295
296         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
297     }
298
299     @Test
300     void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
301         //given
302         final String topic = "TOPIC7";
303         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
304         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
305         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
306         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
307                 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
308
309         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
310         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
311         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
312         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
313
314         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
315         MOCK_SERVER_CLIENT
316                 .when(request().withPath(path), Times.once())
317                 .respond(response().withDelay(TimeUnit.SECONDS, 10));
318         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
319
320         //when
321         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
322                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
323         Mono<MessageRouterSubscribeResponse> response = publisher
324                 .put(publishRequest, jsonMessageBatch)
325                 .then(subscriber.get(subscribeRequest));
326
327         //then
328         StepVerifier.create(response)
329                 .expectNext(expectedResponse)
330                 .expectComplete()
331                 .verify();
332
333         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
334     }
335
336     private MessageRouterSubscriberConfig retryConfig() {
337         return ImmutableMessageRouterSubscriberConfig.builder()
338                 .retryConfig(ImmutableDmaapRetryConfig.builder()
339                         .retryIntervalInSeconds(1)
340                         .retryCount(1)
341                         .build())
342                 .build();
343     }
344 }