15c3bd8e24b8a8171d3b7172d2b693ea24ce4c59
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
39 import org.testcontainers.containers.DockerComposeContainer;
40 import org.testcontainers.junit.jupiter.Container;
41 import org.testcontainers.junit.jupiter.Testcontainers;
42 import reactor.core.publisher.Flux;
43 import reactor.core.publisher.Mono;
44 import reactor.test.StepVerifier;
45
46 import java.time.Duration;
47 import java.util.concurrent.TimeUnit;
48
49 import static org.mockserver.model.HttpRequest.request;
50 import static org.mockserver.model.HttpResponse.response;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
63
64 @Testcontainers
65 class MessageRouterSubscriberIT {
66     @Container
67     private static final DockerComposeContainer CONTAINER = createContainerInstance();
68     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
69             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
70     private static String EVENTS_PATH;
71     private static String PROXY_MOCK_EVENTS_PATH;
72
73     private static final Duration TIMEOUT = Duration.ofSeconds(10);
74     private static final String CONSUMER_GROUP = "group1";
75     private static final String CONSUMER_ID = "consumer200";
76     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
77             "{" +
78             "\"mrstatus\":3001," +
79             "\"helpURL\":\"http://onap.readthedocs.io\"," +
80             "\"message\":\"No such topic exists.-[%s]\"," +
81             "\"status\":404" +
82             "}";
83     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
84             + "{"
85             + "\"requestError\":"
86             + "{"
87             + "\"serviceException\":"
88             + "{"
89             + "\"messageId\":\"SVC0001\","
90             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
91             + "\"variables\":[\"408\"]"
92             + "}"
93             + "}"
94             + "}";
95
96     private MessageRouterPublisher publisher = DmaapClientFactory
97             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
98     private MessageRouterSubscriber subscriber = DmaapClientFactory
99             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
100
101     @BeforeAll
102     static void setUp() {
103         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
104         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
105     }
106
107     @BeforeEach
108     void set() {
109         MOCK_SERVER_CLIENT.reset();
110     }
111
112     @Test
113     void subscriber_shouldHandleNoSuchTopicException() {
114         //given
115         final String topic = "newTopic";
116         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
117                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
118         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
119                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
120
121         //when
122         Mono<MessageRouterSubscribeResponse> response = subscriber
123                 .get(mrSubscribeRequest);
124
125         //then
126         StepVerifier.create(response)
127                 .expectNext(expectedResponse)
128                 .expectComplete()
129                 .verify(TIMEOUT);
130     }
131
132     @Test
133     void subscriberShouldHandleSingleItemResponse() {
134         //given
135         final String topic = "TOPIC";
136         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
137         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
138         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
139
140         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
141         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
142         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
143         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
144
145         //when
146         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
147         Mono<MessageRouterSubscribeResponse> response = publisher
148                 .put(publishRequest, jsonMessageBatch)
149                 .then(subscriber.get(subscribeRequest));
150
151         //then
152         StepVerifier.create(response)
153                 .expectNext(expectedResponse)
154                 .expectComplete()
155                 .verify();
156     }
157
158     @Test
159     void subscriber_shouldHandleMultipleItemsResponse() {
160         //given
161         final String topic = "TOPIC2";
162         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
163         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
164         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
165
166         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
167                 "{\"differentMessage\":\"message2\"}");
168         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
169         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
170         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
171
172         //when
173         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
174         Mono<MessageRouterSubscribeResponse> response = publisher
175                 .put(publishRequest, jsonMessageBatch)
176                 .then(subscriber.get(subscribeRequest));
177
178         //then
179         StepVerifier.create(response)
180                 .expectNext(expectedResponse)
181                 .expectComplete()
182                 .verify();
183     }
184
185     @Test
186     void subscriber_shouldExtractItemsFromResponse() {
187         //given
188         final String topic = "TOPIC3";
189         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
190         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
191         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
192                 CONSUMER_GROUP, CONSUMER_ID);
193
194         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
195                 "{\"differentMessage\":\"message2\"}");
196         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
197
198         //when
199         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
200         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
201                 .thenMany(subscriber.getElements(subscribeRequest));
202
203         //then
204         StepVerifier.create(result)
205                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
206                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
207                 .expectComplete()
208                 .verify(TIMEOUT);
209     }
210
211     @Test
212     void subscriber_shouldSubscribeToTopic() {
213         //given
214         final String topic = "TOPIC4";
215         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
216         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
217         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
218                 CONSUMER_GROUP, CONSUMER_ID);
219
220         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
221                 "{\"differentMessage\":\"message2\"}");
222         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
223         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
224
225         //when
226         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
227         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
228                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
229
230         //then
231         StepVerifier.create(result.take(2))
232                 .expectNext(messages.get(0))
233                 .expectNext(messages.get(1))
234                 .expectComplete()
235                 .verify(TIMEOUT);
236     }
237
238     @Test
239     void subscriber_shouldHandleTimeoutException() {
240         //given
241         final String topic = "TOPIC5";
242         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
243                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
244         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
245         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
246         MOCK_SERVER_CLIENT
247                 .when(request().withPath(path), Times.once())
248                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
249
250         //when
251         Mono<MessageRouterSubscribeResponse> response = subscriber
252                 .get(mrSubscribeRequest);
253
254         //then
255         StepVerifier.create(response)
256                 .expectNext(expectedResponse)
257                 .expectComplete()
258                 .verify(TIMEOUT);
259
260         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
261     }
262
263     @Test
264     void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
265         //given
266         final String topic = "TOPIC6";
267         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
268         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
269         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
270         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
271
272         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
273         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
274         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
275         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
276
277         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
278         MOCK_SERVER_CLIENT
279                 .when(request().withPath(path), Times.once())
280                 .respond(response().withStatusCode(404));
281         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
282                 retryConfig(1, 1));
283
284         //when
285         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
286                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
287         Mono<MessageRouterSubscribeResponse> response = publisher
288                 .put(publishRequest, jsonMessageBatch)
289                 .then(subscriber.get(subscribeRequest));
290
291         //then
292         StepVerifier.create(response)
293                 .expectNext(expectedResponse)
294                 .expectComplete()
295                 .verify();
296
297         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
298     }
299
300     @Test
301     void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
302         //given
303         final String topic = "TOPIC7";
304         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
305         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
306         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
307         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
308                 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
309
310         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
311         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
312         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
313         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
314
315         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
316         MOCK_SERVER_CLIENT
317                 .when(request().withPath(path), Times.once())
318                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
319         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
320                 retryConfig(1, 1));
321
322         //when
323         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
324                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
325         Mono<MessageRouterSubscribeResponse> response = publisher
326                 .put(publishRequest, jsonMessageBatch)
327                 .then(subscriber.get(subscribeRequest));
328
329         //then
330         StepVerifier.create(response)
331                 .expectNext(expectedResponse)
332                 .expectComplete()
333                 .verify();
334
335         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
336     }
337
338     @Test
339     void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
340         //given
341         final String topic = "TOPIC8";
342         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
343         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
344         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
345         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
346                 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
347
348         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
349         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
350         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
351         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
352
353         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
354         MOCK_SERVER_CLIENT
355                 .when(request().withPath(path), Times.once())
356                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
357         MOCK_SERVER_CLIENT
358                 .when(request().withPath(path), Times.once())
359                 .respond(response().withStatusCode(404));
360         MOCK_SERVER_CLIENT
361                 .when(request().withPath(path), Times.once())
362                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
363         MOCK_SERVER_CLIENT
364                 .when(request().withPath(path), Times.once())
365                 .respond(response().withStatusCode(500));
366         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
367                 retryConfig(1, 5));
368
369         //when
370         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
371                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
372         Mono<MessageRouterSubscribeResponse> response = publisher
373                 .put(publishRequest, jsonMessageBatch)
374                 .then(subscriber.get(subscribeRequest));
375
376         //then
377         StepVerifier.create(response)
378                 .expectNext(expectedResponse)
379                 .expectComplete()
380                 .verify();
381
382         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
383     }
384
385     private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
386         return ImmutableMessageRouterSubscriberConfig.builder()
387                 .retryConfig(ImmutableDmaapRetryConfig.builder()
388                         .retryIntervalInSeconds(retryInterval)
389                         .retryCount(retryCount)
390                         .build())
391                 .build();
392     }
393 }