70adf59d59c8d939a3c940fbd7abde9268f1a2b7
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
42 import org.testcontainers.containers.DockerComposeContainer;
43 import org.testcontainers.junit.jupiter.Container;
44 import org.testcontainers.junit.jupiter.Testcontainers;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
48
49 import java.time.Duration;
50 import java.util.concurrent.TimeUnit;
51
52 import static org.mockserver.model.HttpRequest.request;
53 import static org.mockserver.model.HttpResponse.response;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
66 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
67
68 @Testcontainers
69 class MessageRouterPublisherIT {
70     @Container
71     private static final DockerComposeContainer CONTAINER = createContainerInstance();
72     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
73             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
74     private static String EVENTS_PATH;
75     private static String PROXY_MOCK_EVENTS_PATH;
76
77     private static final Duration TIMEOUT = Duration.ofSeconds(10);
78     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
79             + "{"
80             + "\"mrstatus\":5007,"
81             + "\"helpURL\":\"http://onap.readthedocs.io\","
82             + "\"message\":\"Error while publishing data to topic.:%s."
83             + "Successfully published number of messages :0."
84             + "Expected { to start an object.\",\"status\":400"
85             + "}";
86     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
87             + "{"
88             + "\"requestError\":"
89             + "{"
90             + "\"serviceException\":"
91             + "{"
92             + "\"messageId\":\"SVC0001\","
93             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
94             + "\"variables\":[\"408\"]"
95             + "}"
96             + "}"
97             + "}";
98
99     private final MessageRouterPublisher publisher = DmaapClientFactory
100             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
101     private final MessageRouterSubscriber subscriber = DmaapClientFactory
102             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
103
104     @BeforeAll
105     static void setUp() {
106         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
107         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
108     }
109
110     @BeforeEach
111     void set() {
112         MOCK_SERVER_CLIENT.reset();
113     }
114
115     @Test
116     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
117         //given
118         final String topic = "TOPIC";
119         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
120                 "{\"differentMessage\":\"message2\"}");
121         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
122         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
123         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
124
125         //when
126         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
127
128         //then
129         StepVerifier.create(result)
130                 .expectNext(expectedResponse)
131                 .expectComplete()
132                 .verify(TIMEOUT);
133     }
134
135     @Test
136     void publisher_shouldHandleBadRequestError() {
137         //given
138         final String topic = "TOPIC2";
139         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
140         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
141         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
142         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
143                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
144
145         //when
146         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
147
148         //then
149         StepVerifier.create(result)
150                 .expectNext(expectedResponse)
151                 .expectComplete()
152                 .verify(TIMEOUT);
153     }
154
155     @Test
156     void publisher_shouldSuccessfullyPublishSingleMessage() {
157         //given
158         final String topic = "TOPIC3";
159         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
160         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
161         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
162         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
163         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
164         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
165         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
166
167         //when
168         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
169         Mono<MessageRouterSubscribeResponse> response = publisher
170                 .put(publishRequest, jsonMessageBatch)
171                 .then(subscriber.get(subscribeRequest));
172
173         //then
174         StepVerifier.create(response)
175                 .expectNext(expectedResponse)
176                 .expectComplete()
177                 .verify();
178     }
179
180     @Test
181     void publisher_shouldSuccessfullyPublishMultipleMessages() {
182         final String topic = "TOPIC5";
183         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
184         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
185                 "{\"differentMessage\":\"message2\"}");
186         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
187         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
188         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
190         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
191
192         //when
193         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
194         Mono<MessageRouterSubscribeResponse> response = publisher
195                 .put(publishRequest, jsonMessageBatch)
196                 .then(subscriber.get(subscribeRequest));
197
198         //then
199         StepVerifier.create(response)
200                 .expectNext(expectedResponse)
201                 .expectComplete()
202                 .verify();
203     }
204
205     @Test
206     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
207         //given
208         final String topic = "TOPIC6";
209         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
210
211         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
212         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
213         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
214
215         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
216         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
217         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
218
219         //when
220         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
221         Mono<MessageRouterSubscribeResponse> response = publisher
222                 .put(publishRequest, plainBatch)
223                 .then(subscriber.get(subscribeRequest));
224
225         //then
226         StepVerifier.create(response)
227                 .expectNext(expectedResponse)
228                 .expectComplete()
229                 .verify();
230     }
231
232     @Test
233     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
234         //given
235         final String topic = "TOPIC7";
236         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
237
238         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
239         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
240         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
241
242         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
243         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
244         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
245
246         //when
247         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
248         Mono<MessageRouterSubscribeResponse> response = publisher
249                 .put(publishRequest, plainBatch)
250                 .then(subscriber.get(subscribeRequest));
251
252         //then
253         StepVerifier.create(response)
254                 .expectNext(expectedResponse)
255                 .expectComplete()
256                 .verify();
257     }
258
259     @Test
260     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
261         //given
262         final String topic = "TOPIC8";
263         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
264
265         final List<String> singlePlainMessage = List.of("kebab");
266         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
267         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
268
269         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
270         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
271         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
272
273         //when
274         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
275         Mono<MessageRouterSubscribeResponse> response = publisher
276                 .put(publishRequest, plainBatch)
277                 .then(subscriber.get(subscribeRequest));
278
279         //then
280         StepVerifier.create(response)
281                 .expectNext(expectedResponse)
282                 .expectComplete()
283                 .verify();
284     }
285
286     @Test
287     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
288         //given
289         final String topic = "TOPIC9";
290         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
291
292         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
293         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
294         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
295
296         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
297         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
298         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
299
300         //when
301         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
302         Mono<MessageRouterSubscribeResponse> response = publisher
303                 .put(publishRequest, plainBatch)
304                 .then(subscriber.get(subscribeRequest));
305
306         //then
307         StepVerifier.create(response)
308                 .expectNext(expectedResponse)
309                 .expectComplete()
310                 .verify();
311     }
312
313     @Test
314     void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
315         //given
316         final String topic = "TOPIC10";
317         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
318         final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
319         final MessageRouterPublishRequest mrRequest = createPublishRequest(
320                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
321         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
322         final String path = String.format("/events/%s", topic);
323         MOCK_SERVER_CLIENT
324                 .when(request().withPath(path), Times.once())
325                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
326
327         //when
328         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
329
330         //then
331         StepVerifier.create(result)
332                 .expectNext(expectedResponse)
333                 .expectComplete()
334                 .verify(TIMEOUT);
335
336         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
337     }
338
339     @Test
340     void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
341         //given
342         final String topic = "TOPIC11";
343         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
344
345         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
346         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
347         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
348
349         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
350         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
351
352         final String path = String.format("/events/%s", topic);
353         MOCK_SERVER_CLIENT
354                 .when(request().withPath(path), Times.once())
355                 .respond(response().withStatusCode(404));
356
357         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
358                 retryConfig(1, 1));
359
360         //when
361         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
362
363         //then
364         StepVerifier.create(result)
365                 .expectNext(expectedResponse)
366                 .expectComplete()
367                 .verify();
368
369         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
370     }
371
372     @Test
373     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
374         //given
375         final String topic = "TOPIC12";
376         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
377
378         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
379         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
380         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
381
382         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
383         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
384
385         final String path = String.format("/events/%s", topic);
386         MOCK_SERVER_CLIENT
387                 .when(request().withPath(path), Times.once())
388                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
389         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
390                 retryConfig(1, 1));
391
392         //when
393         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
394
395         //then
396         StepVerifier.create(result)
397                 .expectNext(expectedResponse)
398                 .expectComplete()
399                 .verify();
400
401         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
402     }
403
404     @Test
405     void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
406         //given
407         final String topic = "TOPIC13";
408         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
409
410         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
411                 "{\"differentMessage\":\"message2\"}");
412         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
413         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
414
415         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
416         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
417
418         final String path = String.format("/events/%s", topic);
419         MOCK_SERVER_CLIENT
420                 .when(request().withPath(path), Times.once())
421                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
422         MOCK_SERVER_CLIENT
423                 .when(request().withPath(path), Times.once())
424                 .respond(response().withStatusCode(404));
425         MOCK_SERVER_CLIENT
426                 .when(request().withPath(path), Times.once())
427                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
428         MOCK_SERVER_CLIENT
429                 .when(request().withPath(path), Times.once())
430                 .respond(response().withStatusCode(500));
431         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
432
433         //when
434         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
435
436         //then
437         StepVerifier.create(result)
438                 .expectNext(expectedResponse)
439                 .expectComplete()
440                 .verify();
441
442         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
443     }
444
445     @Test
446     void publisher_shouldHandleLastRetryError500() {
447         //given
448         final String topic = "TOPIC14";
449         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
450
451         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
452                 "{\"differentMessage\":\"message2\"}");
453         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
454
455         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
456         final String responseMessage = "Response Message";
457         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
458                 "500 Internal Server Error\n%s", responseMessage);
459
460         final String path = String.format("/events/%s", topic);
461         MOCK_SERVER_CLIENT
462                 .when(request().withPath(path), Times.once())
463                 .respond(response().withStatusCode(404));
464         MOCK_SERVER_CLIENT
465                 .when(request().withPath(path), Times.once())
466                 .respond(response().withStatusCode(500).withBody(responseMessage));
467         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1));
468
469         //when
470         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
471
472         //then
473         StepVerifier.create(result)
474                 .expectNext(expectedResponse)
475                 .expectComplete()
476                 .verify();
477
478         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
479     }
480
481     @Test
482     void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() {
483         //given
484         final String topic = "TOPIC15";
485         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
486
487         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
488         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
489         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
490
491         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
492         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
493
494         final String path = String.format("/events/%s", topic);
495         MOCK_SERVER_CLIENT
496                 .when(request().withPath(path), Times.once())
497                 .respond(response().withStatusCode(200));
498
499         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration());
500
501         //when
502         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
503
504         //then
505         StepVerifier.create(result)
506                 .expectNext(expectedResponse)
507                 .expectComplete()
508                 .verify();
509
510         MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1));
511     }
512
513     @Test
514     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() {
515         //given
516         final String topic = "TOPIC16";
517         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
518
519         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
520         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
521         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
522
523         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
524         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
525
526         final String path = String.format("/events/%s", topic);
527         MOCK_SERVER_CLIENT
528                 .when(request().withPath(path), Times.once())
529                 .respond(response().withDelay(TimeUnit.SECONDS, 10));
530
531         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolAndRetryConfiguration());
532
533         //when
534         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
535
536         //then
537         StepVerifier.create(result)
538                 .expectNext(expectedResponse)
539                 .expectComplete()
540                 .verify();
541
542         MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2));
543     }
544
545     @Test
546     void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() {
547         //given
548         final String topic = "TOPIC17";
549         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
550
551         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
552         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
553         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
554
555         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, "username","password");
556         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
557
558         final String path = String.format("/events/%s", topic);
559
560         //when
561         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
562
563         //then
564         StepVerifier.create(result)
565                 .expectNext(expectedResponse)
566                 .expectComplete()
567                 .verify();
568
569         MOCK_SERVER_CLIENT.verify(request().withPath(path)
570                 .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
571     }
572
573
574     private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
575         return ImmutableMessageRouterPublisherConfig.builder()
576                 .retryConfig(ImmutableDmaapRetryConfig.builder()
577                         .retryIntervalInSeconds(retryInterval)
578                         .retryCount(retryCount)
579                         .build())
580                 .build();
581     }
582     private MessageRouterPublisherConfig connectionPoolConfiguration() {
583         return ImmutableMessageRouterPublisherConfig.builder()
584                 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
585                         .connectionPool(10)
586                         .maxIdleTime(10)
587                         .maxLifeTime(20)
588                         .build())
589                 .build();
590     }
591
592     private MessageRouterPublisherConfig connectionPoolAndRetryConfiguration() {
593         return ImmutableMessageRouterPublisherConfig.builder()
594                 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
595                         .connectionPool(10)
596                         .maxIdleTime(10)
597                         .maxLifeTime(20)
598                         .build())
599                 .retryConfig(ImmutableDmaapRetryConfig.builder()
600                         .retryIntervalInSeconds(1)
601                         .retryCount(1)
602                         .build())
603                 .build();
604     }
605 }