2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
40 import org.testcontainers.containers.DockerComposeContainer;
41 import org.testcontainers.junit.jupiter.Container;
42 import org.testcontainers.junit.jupiter.Testcontainers;
43 import reactor.core.publisher.Flux;
44 import reactor.core.publisher.Mono;
45 import reactor.test.StepVerifier;
47 import java.time.Duration;
48 import java.util.concurrent.TimeUnit;
50 import static org.mockserver.model.HttpRequest.request;
51 import static org.mockserver.model.HttpResponse.response;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
66 class MessageRouterSubscriberIT {
68 private static final DockerComposeContainer CONTAINER = createContainerInstance();
69 private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
70 LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
71 private static String EVENTS_PATH;
72 private static String PROXY_MOCK_EVENTS_PATH;
74 private static final Duration TIMEOUT = Duration.ofSeconds(10);
75 private static final String CONSUMER_GROUP = "group1";
76 private static final String CONSUMER_ID = "consumer200";
77 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
79 "\"mrstatus\":3001," +
80 "\"helpURL\":\"http://onap.readthedocs.io\"," +
81 "\"message\":\"No such topic exists.-[%s]\"," +
84 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
88 + "\"serviceException\":"
90 + "\"messageId\":\"SVC0001\","
91 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
92 + "\"variables\":[\"408\"]"
97 private MessageRouterPublisher publisher = DmaapClientFactory
98 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
99 private MessageRouterSubscriber subscriber = DmaapClientFactory
100 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
103 static void setUp() {
104 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
105 PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
110 MOCK_SERVER_CLIENT.reset();
114 void subscriber_shouldHandleNoSuchTopicException() {
116 final String topic = "newTopic";
117 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
118 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
119 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
120 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
123 Mono<MessageRouterSubscribeResponse> response = subscriber
124 .get(mrSubscribeRequest);
127 StepVerifier.create(response)
128 .expectNext(expectedResponse)
134 void subscriberShouldHandleSingleItemResponse() {
136 final String topic = "TOPIC";
137 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
138 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
139 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
141 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
142 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
143 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
144 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
147 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
148 Mono<MessageRouterSubscribeResponse> response = publisher
149 .put(publishRequest, jsonMessageBatch)
150 .then(subscriber.get(subscribeRequest));
153 StepVerifier.create(response)
154 .expectNext(expectedResponse)
160 void subscriber_shouldHandleMultipleItemsResponse() {
162 final String topic = "TOPIC2";
163 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
164 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
165 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
167 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
168 "{\"differentMessage\":\"message2\"}");
169 final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
170 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
171 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
174 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
175 Mono<MessageRouterSubscribeResponse> response = publisher
176 .put(publishRequest, jsonMessageBatch)
177 .then(subscriber.get(subscribeRequest));
180 StepVerifier.create(response)
181 .expectNext(expectedResponse)
187 void subscriber_shouldExtractItemsFromResponse() {
189 final String topic = "TOPIC3";
190 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
191 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
192 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
193 CONSUMER_GROUP, CONSUMER_ID);
195 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
196 "{\"differentMessage\":\"message2\"}");
197 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
200 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
201 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
202 .thenMany(subscriber.getElements(subscribeRequest));
205 StepVerifier.create(result)
206 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
207 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
213 void subscriber_shouldSubscribeToTopic() {
215 final String topic = "TOPIC4";
216 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
217 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
218 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
219 CONSUMER_GROUP, CONSUMER_ID);
221 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
222 "{\"differentMessage\":\"message2\"}");
223 final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
224 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
227 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
228 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
229 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
232 StepVerifier.create(result.take(2))
233 .expectNext(messages.get(0))
234 .expectNext(messages.get(1))
240 void subscriber_shouldHandleTimeoutException() {
242 final String topic = "TOPIC5";
243 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
244 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
245 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
246 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
248 .when(request().withPath(path), Times.once())
249 .respond(response().withDelay(TimeUnit.SECONDS, 2));
252 Mono<MessageRouterSubscribeResponse> response = subscriber
253 .get(mrSubscribeRequest);
256 StepVerifier.create(response)
257 .expectNext(expectedResponse)
261 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
265 void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
267 final String topic = "TOPIC6";
268 final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
269 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
270 final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
271 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
273 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
274 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
275 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
276 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
278 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
280 .when(request().withPath(path), Times.once())
281 .respond(response().withStatusCode(404));
282 final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
286 registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
287 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
288 Mono<MessageRouterSubscribeResponse> response = publisher
289 .put(publishRequest, jsonMessageBatch)
290 .then(subscriber.get(subscribeRequest));
293 StepVerifier.create(response)
294 .expectNext(expectedResponse)
298 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
302 void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
304 final String topic = "TOPIC7";
305 final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
306 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
307 final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
308 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
309 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
311 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
312 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
313 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
314 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
316 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
318 .when(request().withPath(path), Times.once())
319 .respond(response().withDelay(TimeUnit.SECONDS, 2));
320 final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
324 registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
325 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
326 Mono<MessageRouterSubscribeResponse> response = publisher
327 .put(publishRequest, jsonMessageBatch)
328 .then(subscriber.get(subscribeRequest));
331 StepVerifier.create(response)
332 .expectNext(expectedResponse)
336 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
340 void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
342 final String topic = "TOPIC8";
343 final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
344 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
345 final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
346 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
347 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
349 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
350 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
351 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
352 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
354 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
356 .when(request().withPath(path), Times.once())
357 .respond(response().withDelay(TimeUnit.SECONDS, 2));
359 .when(request().withPath(path), Times.once())
360 .respond(response().withStatusCode(404));
362 .when(request().withPath(path), Times.once())
363 .respond(response().withDelay(TimeUnit.SECONDS, 2));
365 .when(request().withPath(path), Times.once())
366 .respond(response().withStatusCode(500));
367 final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
371 registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
372 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
373 Mono<MessageRouterSubscribeResponse> response = publisher
374 .put(publishRequest, jsonMessageBatch)
375 .then(subscriber.get(subscribeRequest));
378 StepVerifier.create(response)
379 .expectNext(expectedResponse)
383 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
387 void subscriber_shouldHandleLastRetryError500() {
389 final String topic = "TOPIC9";
390 final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
391 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
392 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
393 final String responseMessage = "Response Message";
394 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
395 "500 Internal Server Error\n%s", responseMessage);
397 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
399 .when(request().withPath(path), Times.once())
400 .respond(response().withStatusCode(404));
402 .when(request().withPath(path), Times.once())
403 .respond(response().withStatusCode(500).withBody(responseMessage));
404 final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
408 Mono<MessageRouterSubscribeResponse> response = subscriber.get(subscribeRequest);
411 StepVerifier.create(response)
412 .expectNext(expectedResponse)
416 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
419 void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() {
421 final String topic = "TOPIC4";
422 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
423 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
424 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
425 CONSUMER_GROUP, CONSUMER_ID);
427 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
428 "{\"differentMessage\":\"message2\"}");
429 final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
430 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
432 final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(connectionPoolConfiguration());
435 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
436 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
437 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
440 StepVerifier.create(result.take(2))
441 .expectNext(messages.get(0))
442 .expectNext(messages.get(1))
447 private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
448 return ImmutableMessageRouterSubscriberConfig.builder()
449 .retryConfig(ImmutableDmaapRetryConfig.builder()
450 .retryIntervalInSeconds(retryInterval)
451 .retryCount(retryCount)
456 private MessageRouterSubscriberConfig connectionPoolConfiguration() {
457 return ImmutableMessageRouterSubscriberConfig.builder()
458 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()