2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.TimeToLive;
31 import org.mockserver.matchers.Times;
32 import org.mockserver.verify.VerificationTimes;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
42 import org.testcontainers.containers.DockerComposeContainer;
43 import org.testcontainers.junit.jupiter.Container;
44 import org.testcontainers.junit.jupiter.Testcontainers;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
49 import java.time.Duration;
50 import java.util.concurrent.TimeUnit;
52 import static org.mockserver.model.HttpRequest.request;
53 import static org.mockserver.model.HttpResponse.response;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
66 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
69 class MessageRouterPublisherIT {
71 private static final DockerComposeContainer CONTAINER = createContainerInstance();
72 private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
73 LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
74 private static String EVENTS_PATH;
75 private static String PROXY_MOCK_EVENTS_PATH;
77 private static final Duration TIMEOUT = Duration.ofSeconds(10);
78 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
80 + "\"mrstatus\":5007,"
81 + "\"helpURL\":\"http://onap.readthedocs.io\","
82 + "\"message\":\"Error while publishing data to topic.:%s."
83 + "Successfully published number of messages :0."
84 + "Expected { to start an object.\",\"status\":400"
86 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
90 + "\"serviceException\":"
92 + "\"messageId\":\"SVC0001\","
93 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
94 + "\"variables\":[\"408\"]"
99 private final MessageRouterPublisher publisher = DmaapClientFactory
100 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
101 private final MessageRouterSubscriber subscriber = DmaapClientFactory
102 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
105 static void setUp() {
106 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
107 PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
112 MOCK_SERVER_CLIENT.reset();
116 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
118 final String topic = "TOPIC";
119 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
120 "{\"differentMessage\":\"message2\"}");
121 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
122 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
123 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
126 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
129 StepVerifier.create(result)
130 .expectNext(expectedResponse)
136 void publisher_shouldHandleBadRequestError() {
138 final String topic = "TOPIC2";
139 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
140 final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
141 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
142 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
143 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
146 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
149 StepVerifier.create(result)
150 .expectNext(expectedResponse)
156 void publisher_shouldSuccessfullyPublishSingleMessage() {
158 final String topic = "TOPIC3";
159 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
160 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
161 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
162 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
163 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
164 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
165 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
168 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
169 Mono<MessageRouterSubscribeResponse> response = publisher
170 .put(publishRequest, jsonMessageBatch)
171 .then(subscriber.get(subscribeRequest));
174 StepVerifier.create(response)
175 .expectNext(expectedResponse)
181 void publisher_shouldSuccessfullyPublishMultipleMessages() {
182 final String topic = "TOPIC5";
183 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
184 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
185 "{\"differentMessage\":\"message2\"}");
186 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
187 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
188 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
190 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
193 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
194 Mono<MessageRouterSubscribeResponse> response = publisher
195 .put(publishRequest, jsonMessageBatch)
196 .then(subscriber.get(subscribeRequest));
199 StepVerifier.create(response)
200 .expectNext(expectedResponse)
206 void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
208 final String topic = "TOPIC6";
209 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
211 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
212 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
213 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
215 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
216 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
217 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
220 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
221 Mono<MessageRouterSubscribeResponse> response = publisher
222 .put(publishRequest, plainBatch)
223 .then(subscriber.get(subscribeRequest));
226 StepVerifier.create(response)
227 .expectNext(expectedResponse)
233 void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
235 final String topic = "TOPIC7";
236 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
238 final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
239 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
240 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
242 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
243 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
244 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
247 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
248 Mono<MessageRouterSubscribeResponse> response = publisher
249 .put(publishRequest, plainBatch)
250 .then(subscriber.get(subscribeRequest));
253 StepVerifier.create(response)
254 .expectNext(expectedResponse)
260 void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
262 final String topic = "TOPIC8";
263 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
265 final List<String> singlePlainMessage = List.of("kebab");
266 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
267 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
269 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
270 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
271 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
274 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
275 Mono<MessageRouterSubscribeResponse> response = publisher
276 .put(publishRequest, plainBatch)
277 .then(subscriber.get(subscribeRequest));
280 StepVerifier.create(response)
281 .expectNext(expectedResponse)
287 void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
289 final String topic = "TOPIC9";
290 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
292 final List<String> singlePlainMessage = List.of("I", "like", "pizza");
293 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
294 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
296 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
297 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
298 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
301 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
302 Mono<MessageRouterSubscribeResponse> response = publisher
303 .put(publishRequest, plainBatch)
304 .then(subscriber.get(subscribeRequest));
307 StepVerifier.create(response)
308 .expectNext(expectedResponse)
314 void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
316 final String topic = "TOPIC10";
317 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
318 final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
319 final MessageRouterPublishRequest mrRequest = createPublishRequest(
320 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
321 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
322 final String path = String.format("/events/%s", topic);
324 .when(request().withPath(path), Times.once())
325 .respond(response().withDelay(TimeUnit.SECONDS, 2));
328 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
331 StepVerifier.create(result)
332 .expectNext(expectedResponse)
336 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
340 void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
341 final String topic = "TOPIC11";
342 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
344 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
345 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
346 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
348 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
349 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
351 final String path = String.format("/events/%s", topic);
353 .when(request().withPath(path), Times.once())
354 .respond(response().withStatusCode(404));
355 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
358 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
361 StepVerifier.create(result)
362 .expectNext(expectedResponse)
366 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
370 void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
371 final String topic = "TOPIC12";
372 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
374 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
375 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
376 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
378 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
379 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
381 final String path = String.format("/events/%s", topic);
383 .when(request().withPath(path), Times.once())
384 .respond(response().withDelay(TimeUnit.SECONDS, 10));
385 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
388 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
391 StepVerifier.create(result)
392 .expectNext(expectedResponse)
396 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
399 private MessageRouterPublisherConfig retryConfig() {
400 return ImmutableMessageRouterPublisherConfig.builder()
401 .retryConfig(ImmutableDmaapRetryConfig.builder()
402 .retryIntervalInSeconds(1)