2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
42 import org.testcontainers.containers.DockerComposeContainer;
43 import org.testcontainers.junit.jupiter.Container;
44 import org.testcontainers.junit.jupiter.Testcontainers;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
49 import java.time.Duration;
50 import java.util.concurrent.TimeUnit;
52 import static org.mockserver.model.HttpRequest.request;
53 import static org.mockserver.model.HttpResponse.response;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
66 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
69 class MessageRouterPublisherIT {
71 private static final DockerComposeContainer CONTAINER = createContainerInstance();
72 private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
73 LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
74 private static String EVENTS_PATH;
75 private static String PROXY_MOCK_EVENTS_PATH;
77 private static final Duration TIMEOUT = Duration.ofSeconds(10);
78 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
80 + "\"mrstatus\":5007,"
81 + "\"helpURL\":\"http://onap.readthedocs.io\","
82 + "\"message\":\"Error while publishing data to topic.:%s."
83 + "Successfully published number of messages :0."
84 + "Expected { to start an object.\",\"status\":400"
86 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
90 + "\"serviceException\":"
92 + "\"messageId\":\"SVC0001\","
93 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
94 + "\"variables\":[\"408\"]"
98 private static final String CONNECTION_POLL_LIMIT_MESSAGE = "429 Too Many Requests\n"
100 + "\"requestError\":"
102 + "\"serviceException\":"
104 + "\"messageId\":\"SVC2000\","
105 + "\"text\":\"Pending acquire queue has reached its maximum size\","
106 + "\"variables\":[\"429\"]"
111 private final MessageRouterPublisher publisher = DmaapClientFactory
112 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
113 private final MessageRouterSubscriber subscriber = DmaapClientFactory
114 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
117 static void setUp() {
118 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
119 PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
124 MOCK_SERVER_CLIENT.reset();
128 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
130 final String topic = "TOPIC";
131 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
132 "{\"differentMessage\":\"message2\"}");
133 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
134 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
135 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
138 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
141 StepVerifier.create(result)
142 .expectNext(expectedResponse)
148 void publisher_shouldHandleBadRequestError() {
150 final String topic = "TOPIC2";
151 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
152 final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
153 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
154 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
155 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
158 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
161 StepVerifier.create(result)
162 .expectNext(expectedResponse)
168 void publisher_shouldSuccessfullyPublishSingleMessage() {
170 final String topic = "TOPIC3";
171 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
172 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
173 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
174 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
175 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
176 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
177 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
180 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
181 Mono<MessageRouterSubscribeResponse> response = publisher
182 .put(publishRequest, jsonMessageBatch)
183 .then(subscriber.get(subscribeRequest));
186 StepVerifier.create(response)
187 .expectNext(expectedResponse)
193 void publisher_shouldSuccessfullyPublishMultipleMessages() {
194 final String topic = "TOPIC5";
195 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
196 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
197 "{\"differentMessage\":\"message2\"}");
198 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
199 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
200 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
201 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
202 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
205 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
206 Mono<MessageRouterSubscribeResponse> response = publisher
207 .put(publishRequest, jsonMessageBatch)
208 .then(subscriber.get(subscribeRequest));
211 StepVerifier.create(response)
212 .expectNext(expectedResponse)
218 void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
220 final String topic = "TOPIC6";
221 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
223 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
224 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
225 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
227 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
228 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
229 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
232 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
233 Mono<MessageRouterSubscribeResponse> response = publisher
234 .put(publishRequest, plainBatch)
235 .then(subscriber.get(subscribeRequest));
238 StepVerifier.create(response)
239 .expectNext(expectedResponse)
245 void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
247 final String topic = "TOPIC7";
248 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
250 final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
251 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
252 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
254 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
255 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
256 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
259 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
260 Mono<MessageRouterSubscribeResponse> response = publisher
261 .put(publishRequest, plainBatch)
262 .then(subscriber.get(subscribeRequest));
265 StepVerifier.create(response)
266 .expectNext(expectedResponse)
272 void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
274 final String topic = "TOPIC8";
275 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
277 final List<String> singlePlainMessage = List.of("kebab");
278 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
279 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
281 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
282 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
283 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
286 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
287 Mono<MessageRouterSubscribeResponse> response = publisher
288 .put(publishRequest, plainBatch)
289 .then(subscriber.get(subscribeRequest));
292 StepVerifier.create(response)
293 .expectNext(expectedResponse)
299 void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
301 final String topic = "TOPIC9";
302 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
304 final List<String> singlePlainMessage = List.of("I", "like", "pizza");
305 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
306 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
308 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
309 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
310 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
313 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
314 Mono<MessageRouterSubscribeResponse> response = publisher
315 .put(publishRequest, plainBatch)
316 .then(subscriber.get(subscribeRequest));
319 StepVerifier.create(response)
320 .expectNext(expectedResponse)
326 void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
328 final String topic = "TOPIC10";
329 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
330 final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
331 final MessageRouterPublishRequest mrRequest = createPublishRequest(
332 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
333 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
334 final String path = String.format("/events/%s", topic);
336 .when(request().withPath(path), Times.once())
337 .respond(response().withDelay(TimeUnit.SECONDS, 2));
340 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
343 StepVerifier.create(result)
344 .expectNext(expectedResponse)
348 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
352 void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
354 final String topic = "TOPIC11";
355 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
357 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
358 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
359 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
361 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
362 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
364 final String path = String.format("/events/%s", topic);
366 .when(request().withPath(path), Times.once())
367 .respond(response().withStatusCode(404));
369 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
373 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
376 StepVerifier.create(result)
377 .expectNext(expectedResponse)
381 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
385 void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
387 final String topic = "TOPIC12";
388 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
390 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
391 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
392 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
394 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
395 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
397 final String path = String.format("/events/%s", topic);
399 .when(request().withPath(path), Times.once())
400 .respond(response().withDelay(TimeUnit.SECONDS, 2));
401 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
405 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
408 StepVerifier.create(result)
409 .expectNext(expectedResponse)
413 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
417 void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
419 final String topic = "TOPIC13";
420 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
422 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
423 "{\"differentMessage\":\"message2\"}");
424 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
425 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
427 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
428 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
430 final String path = String.format("/events/%s", topic);
432 .when(request().withPath(path), Times.once())
433 .respond(response().withDelay(TimeUnit.SECONDS, 2));
435 .when(request().withPath(path), Times.once())
436 .respond(response().withStatusCode(404));
438 .when(request().withPath(path), Times.once())
439 .respond(response().withDelay(TimeUnit.SECONDS, 2));
441 .when(request().withPath(path), Times.once())
442 .respond(response().withStatusCode(500));
443 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
446 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
449 StepVerifier.create(result)
450 .expectNext(expectedResponse)
454 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
458 void publisher_shouldHandleLastRetryError500() {
460 final String topic = "TOPIC14";
461 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
463 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
464 "{\"differentMessage\":\"message2\"}");
465 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
467 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
468 final String responseMessage = "Response Message";
469 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
470 "500 Internal Server Error\n%s", responseMessage);
472 final String path = String.format("/events/%s", topic);
474 .when(request().withPath(path), Times.once())
475 .respond(response().withStatusCode(404));
477 .when(request().withPath(path), Times.once())
478 .respond(response().withStatusCode(500).withBody(responseMessage));
479 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1));
482 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
485 StepVerifier.create(result)
486 .expectNext(expectedResponse)
490 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
494 void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() {
496 final String topic = "TOPIC15";
497 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
499 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
500 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
501 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
503 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
504 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
506 final String path = String.format("/events/%s", topic);
508 .when(request().withPath(path), Times.once())
509 .respond(response().withStatusCode(200));
511 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration());
514 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
517 StepVerifier.create(result)
518 .expectNext(expectedResponse)
522 MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1));
526 void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() {
528 final String topic = "TOPIC16";
529 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
531 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
532 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
533 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
535 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
536 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
538 final String path = String.format("/events/%s", topic);
540 .when(request().withPath(path), Times.once())
541 .respond(response().withDelay(TimeUnit.SECONDS, 10));
543 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolAndRetryConfiguration());
546 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
549 StepVerifier.create(result)
550 .expectNext(expectedResponse)
554 MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2));
558 void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() {
560 final String topic = "TOPIC17";
561 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
563 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
564 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
565 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
567 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, "username","password");
568 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
570 final String path = String.format("/events/%s", topic);
573 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
576 StepVerifier.create(result)
577 .expectNext(expectedResponse)
581 MOCK_SERVER_CLIENT.verify(request().withPath(path)
582 .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
586 void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() {
588 final String topic = "TOPIC17";
589 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
591 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
592 "{\"differentMessage\":\"message2\"}");
593 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
595 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
597 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
598 CONNECTION_POLL_LIMIT_MESSAGE);
600 final String path = String.format("/events/%s", topic);
602 //maxConnectionPoll + pendingAcquireMaxCount(default 2*maxConnectionPoll)
603 final int maxNumberOfConcurrentRequest = 3;
605 .when(request().withPath(path), Times.exactly(maxNumberOfConcurrentRequest))
606 .respond(response().withStatusCode(429).withDelay(TimeUnit.SECONDS,1));
609 .when(request().withPath(path), Times.once())
610 .respond(response().withStatusCode(200));
612 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration());
615 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
617 for(int i = 0; i < maxNumberOfConcurrentRequest; i++) {
618 publisher.put(publishRequest, plainBatch).subscribe();
622 StepVerifier.create(result)
623 .expectNext(expectedResponse)
628 private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
629 return ImmutableMessageRouterPublisherConfig.builder()
630 .retryConfig(ImmutableDmaapRetryConfig.builder()
631 .retryIntervalInSeconds(retryInterval)
632 .retryCount(retryCount)
636 private MessageRouterPublisherConfig connectionPoolConfiguration() {
637 return ImmutableMessageRouterPublisherConfig.builder()
638 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
646 private MessageRouterPublisherConfig connectionPoolAndRetryConfiguration() {
647 return ImmutableMessageRouterPublisherConfig.builder()
648 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
653 .retryConfig(ImmutableDmaapRetryConfig.builder()
654 .retryIntervalInSeconds(1)