2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
42 import org.testcontainers.containers.DockerComposeContainer;
43 import org.testcontainers.junit.jupiter.Container;
44 import org.testcontainers.junit.jupiter.Testcontainers;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
49 import java.time.Duration;
50 import java.util.concurrent.TimeUnit;
52 import static org.mockserver.model.HttpRequest.request;
53 import static org.mockserver.model.HttpResponse.response;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
66 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
69 class MessageRouterPublisherIT {
71 private static final DockerComposeContainer CONTAINER = createContainerInstance();
72 private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
73 LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
74 private static String EVENTS_PATH;
75 private static String PROXY_MOCK_EVENTS_PATH;
77 private static final Duration TIMEOUT = Duration.ofSeconds(10);
78 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
80 + "\"mrstatus\":5007,"
81 + "\"helpURL\":\"http://onap.readthedocs.io\","
82 + "\"message\":\"Error while publishing data to topic.:%s."
83 + "Successfully published number of messages :0."
84 + "Expected { to start an object.\",\"status\":400"
86 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
90 + "\"serviceException\":"
92 + "\"messageId\":\"SVC0001\","
93 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
94 + "\"variables\":[\"408\"]"
99 private final MessageRouterPublisher publisher = DmaapClientFactory
100 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
101 private final MessageRouterSubscriber subscriber = DmaapClientFactory
102 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
105 static void setUp() {
106 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
107 PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
112 MOCK_SERVER_CLIENT.reset();
116 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
118 final String topic = "TOPIC";
119 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
120 "{\"differentMessage\":\"message2\"}");
121 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
122 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
123 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
126 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
129 StepVerifier.create(result)
130 .expectNext(expectedResponse)
136 void publisher_shouldHandleBadRequestError() {
138 final String topic = "TOPIC2";
139 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
140 final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
141 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
142 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
143 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
146 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
149 StepVerifier.create(result)
150 .expectNext(expectedResponse)
156 void publisher_shouldSuccessfullyPublishSingleMessage() {
158 final String topic = "TOPIC3";
159 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
160 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
161 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
162 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
163 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
164 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
165 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
168 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
169 Mono<MessageRouterSubscribeResponse> response = publisher
170 .put(publishRequest, jsonMessageBatch)
171 .then(subscriber.get(subscribeRequest));
174 StepVerifier.create(response)
175 .expectNext(expectedResponse)
181 void publisher_shouldSuccessfullyPublishMultipleMessages() {
182 final String topic = "TOPIC5";
183 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
184 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
185 "{\"differentMessage\":\"message2\"}");
186 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
187 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
188 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
190 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
193 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
194 Mono<MessageRouterSubscribeResponse> response = publisher
195 .put(publishRequest, jsonMessageBatch)
196 .then(subscriber.get(subscribeRequest));
199 StepVerifier.create(response)
200 .expectNext(expectedResponse)
206 void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
208 final String topic = "TOPIC6";
209 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
211 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
212 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
213 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
215 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
216 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
217 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
220 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
221 Mono<MessageRouterSubscribeResponse> response = publisher
222 .put(publishRequest, plainBatch)
223 .then(subscriber.get(subscribeRequest));
226 StepVerifier.create(response)
227 .expectNext(expectedResponse)
233 void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
235 final String topic = "TOPIC7";
236 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
238 final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
239 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
240 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
242 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
243 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
244 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
247 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
248 Mono<MessageRouterSubscribeResponse> response = publisher
249 .put(publishRequest, plainBatch)
250 .then(subscriber.get(subscribeRequest));
253 StepVerifier.create(response)
254 .expectNext(expectedResponse)
260 void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
262 final String topic = "TOPIC8";
263 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
265 final List<String> singlePlainMessage = List.of("kebab");
266 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
267 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
269 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
270 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
271 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
274 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
275 Mono<MessageRouterSubscribeResponse> response = publisher
276 .put(publishRequest, plainBatch)
277 .then(subscriber.get(subscribeRequest));
280 StepVerifier.create(response)
281 .expectNext(expectedResponse)
287 void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
289 final String topic = "TOPIC9";
290 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
292 final List<String> singlePlainMessage = List.of("I", "like", "pizza");
293 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
294 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
296 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
297 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
298 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
301 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
302 Mono<MessageRouterSubscribeResponse> response = publisher
303 .put(publishRequest, plainBatch)
304 .then(subscriber.get(subscribeRequest));
307 StepVerifier.create(response)
308 .expectNext(expectedResponse)
314 void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
316 final String topic = "TOPIC10";
317 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
318 final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
319 final MessageRouterPublishRequest mrRequest = createPublishRequest(
320 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
321 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
322 final String path = String.format("/events/%s", topic);
324 .when(request().withPath(path), Times.once())
325 .respond(response().withDelay(TimeUnit.SECONDS, 2));
328 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
331 StepVerifier.create(result)
332 .expectNext(expectedResponse)
336 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
340 void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
342 final String topic = "TOPIC11";
343 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
345 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
346 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
347 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
349 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
350 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
352 final String path = String.format("/events/%s", topic);
354 .when(request().withPath(path), Times.once())
355 .respond(response().withStatusCode(404));
357 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
361 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
364 StepVerifier.create(result)
365 .expectNext(expectedResponse)
369 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
373 void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
375 final String topic = "TOPIC12";
376 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
378 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
379 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
380 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
382 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
383 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
385 final String path = String.format("/events/%s", topic);
387 .when(request().withPath(path), Times.once())
388 .respond(response().withDelay(TimeUnit.SECONDS, 2));
389 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
393 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
396 StepVerifier.create(result)
397 .expectNext(expectedResponse)
401 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
405 void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
407 final String topic = "TOPIC13";
408 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
410 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
411 "{\"differentMessage\":\"message2\"}");
412 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
413 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
415 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
416 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
418 final String path = String.format("/events/%s", topic);
420 .when(request().withPath(path), Times.once())
421 .respond(response().withDelay(TimeUnit.SECONDS, 2));
423 .when(request().withPath(path), Times.once())
424 .respond(response().withStatusCode(404));
426 .when(request().withPath(path), Times.once())
427 .respond(response().withDelay(TimeUnit.SECONDS, 2));
429 .when(request().withPath(path), Times.once())
430 .respond(response().withStatusCode(500));
431 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
434 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
437 StepVerifier.create(result)
438 .expectNext(expectedResponse)
442 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
446 void publisher_shouldHandleLastRetryError500() {
448 final String topic = "TOPIC14";
449 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
451 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
452 "{\"differentMessage\":\"message2\"}");
453 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
455 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
456 final String responseMessage = "Response Message";
457 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
458 "500 Internal Server Error\n%s", responseMessage);
460 final String path = String.format("/events/%s", topic);
462 .when(request().withPath(path), Times.once())
463 .respond(response().withStatusCode(404));
465 .when(request().withPath(path), Times.once())
466 .respond(response().withStatusCode(500).withBody(responseMessage));
467 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1));
470 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
473 StepVerifier.create(result)
474 .expectNext(expectedResponse)
478 MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
482 void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() {
484 final String topic = "TOPIC15";
485 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
487 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
488 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
489 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
491 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
492 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
494 final String path = String.format("/events/%s", topic);
496 .when(request().withPath(path), Times.once())
497 .respond(response().withStatusCode(200));
499 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration());
502 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
505 StepVerifier.create(result)
506 .expectNext(expectedResponse)
510 MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1));
514 void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() {
516 final String topic = "TOPIC16";
517 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
519 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
520 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
521 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
523 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
524 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
526 final String path = String.format("/events/%s", topic);
528 .when(request().withPath(path), Times.once())
529 .respond(response().withDelay(TimeUnit.SECONDS, 10));
531 final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolAndRetryConfiguration());
534 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
537 StepVerifier.create(result)
538 .expectNext(expectedResponse)
542 MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2));
546 void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() {
548 final String topic = "TOPIC17";
549 final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
551 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
552 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
553 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
555 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, "username","password");
556 final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
558 final String path = String.format("/events/%s", topic);
561 final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
564 StepVerifier.create(result)
565 .expectNext(expectedResponse)
569 MOCK_SERVER_CLIENT.verify(request().withPath(path)
570 .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
574 private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
575 return ImmutableMessageRouterPublisherConfig.builder()
576 .retryConfig(ImmutableDmaapRetryConfig.builder()
577 .retryIntervalInSeconds(retryInterval)
578 .retryCount(retryCount)
582 private MessageRouterPublisherConfig connectionPoolConfiguration() {
583 return ImmutableMessageRouterPublisherConfig.builder()
584 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
592 private MessageRouterPublisherConfig connectionPoolAndRetryConfiguration() {
593 return ImmutableMessageRouterPublisherConfig.builder()
594 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
599 .retryConfig(ImmutableDmaapRetryConfig.builder()
600 .retryIntervalInSeconds(1)