2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
39 import org.testcontainers.containers.DockerComposeContainer;
40 import org.testcontainers.junit.jupiter.Container;
41 import org.testcontainers.junit.jupiter.Testcontainers;
42 import reactor.core.publisher.Flux;
43 import reactor.core.publisher.Mono;
44 import reactor.test.StepVerifier;
46 import java.time.Duration;
47 import java.util.concurrent.TimeUnit;
49 import static org.mockserver.model.HttpRequest.request;
50 import static org.mockserver.model.HttpResponse.response;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
65 class MessageRouterSubscriberIT {
67 private static final DockerComposeContainer CONTAINER = createContainerInstance();
68 private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
69 LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
70 private static String EVENTS_PATH;
71 private static String PROXY_MOCK_EVENTS_PATH;
73 private static final Duration TIMEOUT = Duration.ofSeconds(10);
74 private static final String CONSUMER_GROUP = "group1";
75 private static final String CONSUMER_ID = "consumer200";
76 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
78 "\"mrstatus\":3001," +
79 "\"helpURL\":\"http://onap.readthedocs.io\"," +
80 "\"message\":\"No such topic exists.-[%s]\"," +
83 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
87 + "\"serviceException\":"
89 + "\"messageId\":\"SVC0001\","
90 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
91 + "\"variables\":[\"408\"]"
96 private MessageRouterPublisher publisher = DmaapClientFactory
97 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
98 private MessageRouterSubscriber subscriber = DmaapClientFactory
99 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
102 static void setUp() {
103 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
104 PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
109 MOCK_SERVER_CLIENT.reset();
113 void subscriber_shouldHandleNoSuchTopicException() {
115 final String topic = "newTopic";
116 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
117 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
118 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
119 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
122 Mono<MessageRouterSubscribeResponse> response = subscriber
123 .get(mrSubscribeRequest);
126 StepVerifier.create(response)
127 .expectNext(expectedResponse)
133 void subscriberShouldHandleSingleItemResponse() {
135 final String topic = "TOPIC";
136 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
137 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
138 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
140 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
141 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
142 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
143 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
146 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
147 Mono<MessageRouterSubscribeResponse> response = publisher
148 .put(publishRequest, jsonMessageBatch)
149 .then(subscriber.get(subscribeRequest));
152 StepVerifier.create(response)
153 .expectNext(expectedResponse)
159 void subscriber_shouldHandleMultipleItemsResponse() {
161 final String topic = "TOPIC2";
162 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
163 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
164 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
166 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
167 "{\"differentMessage\":\"message2\"}");
168 final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
169 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
170 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
173 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
174 Mono<MessageRouterSubscribeResponse> response = publisher
175 .put(publishRequest, jsonMessageBatch)
176 .then(subscriber.get(subscribeRequest));
179 StepVerifier.create(response)
180 .expectNext(expectedResponse)
186 void subscriber_shouldExtractItemsFromResponse() {
188 final String topic = "TOPIC3";
189 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
190 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
191 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
192 CONSUMER_GROUP, CONSUMER_ID);
194 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
195 "{\"differentMessage\":\"message2\"}");
196 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
199 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
200 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
201 .thenMany(subscriber.getElements(subscribeRequest));
204 StepVerifier.create(result)
205 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
206 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
212 void subscriber_shouldSubscribeToTopic() {
214 final String topic = "TOPIC4";
215 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
216 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
217 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
218 CONSUMER_GROUP, CONSUMER_ID);
220 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
221 "{\"differentMessage\":\"message2\"}");
222 final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
223 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
226 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
227 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
228 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
231 StepVerifier.create(result.take(2))
232 .expectNext(messages.get(0))
233 .expectNext(messages.get(1))
239 void subscriber_shouldHandleTimeoutException() {
241 final String topic = "TOPIC5";
242 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
243 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
244 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
245 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
247 .when(request().withPath(path), Times.once())
248 .respond(response().withDelay(TimeUnit.SECONDS, 5));
251 Mono<MessageRouterSubscribeResponse> response = subscriber
252 .get(mrSubscribeRequest);
255 StepVerifier.create(response)
256 .expectNext(expectedResponse)
260 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
264 void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
266 final String topic = "TOPIC6";
267 final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
268 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
269 final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
270 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
272 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
273 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
274 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
275 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
277 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
279 .when(request().withPath(path), Times.once())
280 .respond(response().withStatusCode(404));
281 final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
284 registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
285 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
286 Mono<MessageRouterSubscribeResponse> response = publisher
287 .put(publishRequest, jsonMessageBatch)
288 .then(subscriber.get(subscribeRequest));
291 StepVerifier.create(response)
292 .expectNext(expectedResponse)
296 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
300 void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
302 final String topic = "TOPIC7";
303 final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
304 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
305 final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
306 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
307 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
309 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
310 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
311 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
312 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
314 final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
316 .when(request().withPath(path), Times.once())
317 .respond(response().withDelay(TimeUnit.SECONDS, 10));
318 final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
321 registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
322 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
323 Mono<MessageRouterSubscribeResponse> response = publisher
324 .put(publishRequest, jsonMessageBatch)
325 .then(subscriber.get(subscribeRequest));
328 StepVerifier.create(response)
329 .expectNext(expectedResponse)
333 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
336 private MessageRouterSubscriberConfig retryConfig() {
337 return ImmutableMessageRouterSubscriberConfig.builder()
338 .retryConfig(ImmutableDmaapRetryConfig.builder()
339 .retryIntervalInSeconds(1)