2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
41 import org.testcontainers.containers.DockerComposeContainer;
42 import org.testcontainers.junit.jupiter.Container;
43 import org.testcontainers.junit.jupiter.Testcontainers;
44 import reactor.core.publisher.Flux;
45 import reactor.core.publisher.Mono;
46 import reactor.test.StepVerifier;
48 import java.time.Duration;
49 import java.util.concurrent.TimeUnit;
51 import static org.mockserver.model.HttpRequest.request;
52 import static org.mockserver.model.HttpResponse.response;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
68 class MessageRouterPublisherIT {
70 private static final DockerComposeContainer CONTAINER = createContainerInstance();
71 private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
72 LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
73 private static String EVENTS_PATH;
74 private static String PROXY_MOCK_EVENTS_PATH;
76 private static final Duration TIMEOUT = Duration.ofSeconds(10);
77 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
79 + "\"mrstatus\":5007,"
80 + "\"helpURL\":\"http://onap.readthedocs.io\","
81 + "\"message\":\"Error while publishing data to topic.:%s."
82 + "Successfully published number of messages :0."
83 + "Expected { to start an object.\",\"status\":400"
85 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
89 + "\"serviceException\":"
91 + "\"messageId\":\"SVC0001\","
92 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
93 + "\"variables\":[\"408\"]"
98 private final MessageRouterPublisher publisher = DmaapClientFactory
99 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
100 private final MessageRouterSubscriber subscriber = DmaapClientFactory
101 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
104 static void setUp() {
105 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
106 PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
111 MOCK_SERVER_CLIENT.reset();
115 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
117 final String topic = "TOPIC";
118 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
119 "{\"differentMessage\":\"message2\"}");
120 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
121 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
122 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
125 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
128 StepVerifier.create(result)
129 .expectNext(expectedResponse)
135 void publisher_shouldHandleBadRequestError() {
137 final String topic = "TOPIC2";
138 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
139 final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
140 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
141 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
142 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
145 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
148 StepVerifier.create(result)
149 .expectNext(expectedResponse)
155 void publisher_shouldSuccessfullyPublishSingleMessage() {
157 final String topic = "TOPIC3";
158 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
159 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
160 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
161 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
162 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
163 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
164 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
167 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
168 Mono<MessageRouterSubscribeResponse> response = publisher
169 .put(publishRequest, jsonMessageBatch)
170 .then(subscriber.get(subscribeRequest));
173 StepVerifier.create(response)
174 .expectNext(expectedResponse)
180 void publisher_shouldSuccessfullyPublishMultipleMessages() {
181 final String topic = "TOPIC5";
182 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
183 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
184 "{\"differentMessage\":\"message2\"}");
185 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
186 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
187 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
188 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
189 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
192 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
193 Mono<MessageRouterSubscribeResponse> response = publisher
194 .put(publishRequest, jsonMessageBatch)
195 .then(subscriber.get(subscribeRequest));
198 StepVerifier.create(response)
199 .expectNext(expectedResponse)
205 void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
207 final String topic = "TOPIC6";
208 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
210 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
211 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
212 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
214 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
215 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
216 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
219 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
220 Mono<MessageRouterSubscribeResponse> response = publisher
221 .put(publishRequest, plainBatch)
222 .then(subscriber.get(subscribeRequest));
225 StepVerifier.create(response)
226 .expectNext(expectedResponse)
232 void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
234 final String topic = "TOPIC7";
235 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
237 final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
238 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
239 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
241 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
242 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
243 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
246 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
247 Mono<MessageRouterSubscribeResponse> response = publisher
248 .put(publishRequest, plainBatch)
249 .then(subscriber.get(subscribeRequest));
252 StepVerifier.create(response)
253 .expectNext(expectedResponse)
259 void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
261 final String topic = "TOPIC8";
262 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
264 final List<String> singlePlainMessage = List.of("kebab");
265 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
266 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
268 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
269 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
270 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
273 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
274 Mono<MessageRouterSubscribeResponse> response = publisher
275 .put(publishRequest, plainBatch)
276 .then(subscriber.get(subscribeRequest));
279 StepVerifier.create(response)
280 .expectNext(expectedResponse)
286 void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
288 final String topic = "TOPIC9";
289 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
291 final List<String> singlePlainMessage = List.of("I", "like", "pizza");
292 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
293 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
295 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
296 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
297 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
300 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
301 Mono<MessageRouterSubscribeResponse> response = publisher
302 .put(publishRequest, plainBatch)
303 .then(subscriber.get(subscribeRequest));
306 StepVerifier.create(response)
307 .expectNext(expectedResponse)
313 void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
315 final String topic = "TOPIC10";
316 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
317 final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
318 final MessageRouterPublishRequest mrRequest = createPublishRequest(
319 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
320 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
321 final String path = String.format("/events/%s", topic);
323 .when(request().withPath(path), Times.once())
324 .respond(response().withDelay(TimeUnit.SECONDS, 2));
327 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
330 StepVerifier.create(result)
331 .expectNext(expectedResponse)
335 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
339 void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
340 final String topic = "TOPIC11";
341 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
343 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
344 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
345 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
347 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
348 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
350 final String path = String.format("/events/%s", topic);
352 .when(request().withPath(path), Times.once())
353 .respond(response().withStatusCode(404));
355 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
359 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
362 StepVerifier.create(result)
363 .expectNext(expectedResponse)
367 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
371 void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
372 final String topic = "TOPIC12";
373 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
375 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
376 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
377 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
379 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
380 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
382 final String path = String.format("/events/%s", topic);
384 .when(request().withPath(path), Times.once())
385 .respond(response().withDelay(TimeUnit.SECONDS, 2));
386 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
390 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
393 StepVerifier.create(result)
394 .expectNext(expectedResponse)
398 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
402 void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
403 final String topic = "TOPIC13";
404 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
406 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
407 "{\"differentMessage\":\"message2\"}");
408 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
409 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
411 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
412 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
414 final String path = String.format("/events/%s", topic);
416 .when(request().withPath(path), Times.once())
417 .respond(response().withDelay(TimeUnit.SECONDS, 2));
419 .when(request().withPath(path), Times.once())
420 .respond(response().withStatusCode(404));
422 .when(request().withPath(path), Times.once())
423 .respond(response().withDelay(TimeUnit.SECONDS, 2));
425 .when(request().withPath(path), Times.once())
426 .respond(response().withStatusCode(500));
427 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
430 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
433 StepVerifier.create(result)
434 .expectNext(expectedResponse)
438 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
441 private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
442 return ImmutableMessageRouterPublisherConfig.builder()
443 .retryConfig(ImmutableDmaapRetryConfig.builder()
444 .retryIntervalInSeconds(retryInterval)
445 .retryCount(retryCount)