a1ad951f3dd018538ac28d89c9ff6951e8a05b9f
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.mockserver.client.MockServerClient;
30 import org.mockserver.matchers.Times;
31 import org.mockserver.verify.VerificationTimes;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
42 import org.testcontainers.containers.DockerComposeContainer;
43 import org.testcontainers.junit.jupiter.Container;
44 import org.testcontainers.junit.jupiter.Testcontainers;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
48
49 import java.time.Duration;
50 import java.util.concurrent.TimeUnit;
51
52 import static org.mockserver.model.HttpRequest.request;
53 import static org.mockserver.model.HttpResponse.response;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
66 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
67
68 @Testcontainers
69 class MessageRouterPublisherIT {
70     @Container
71     private static final DockerComposeContainer CONTAINER = createContainerInstance();
72     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
73             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
74     private static String EVENTS_PATH;
75     private static String PROXY_MOCK_EVENTS_PATH;
76
77     private static final Duration TIMEOUT = Duration.ofSeconds(10);
78     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
79             + "{"
80             + "\"mrstatus\":5007,"
81             + "\"helpURL\":\"http://onap.readthedocs.io\","
82             + "\"message\":\"Error while publishing data to topic.:%s."
83             + "Successfully published number of messages :0."
84             + "Expected { to start an object.\",\"status\":400"
85             + "}";
86     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
87             + "{"
88             + "\"requestError\":"
89             + "{"
90             + "\"serviceException\":"
91             + "{"
92             + "\"messageId\":\"SVC0001\","
93             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
94             + "\"variables\":[\"408\"]"
95             + "}"
96             + "}"
97             + "}";
98     private static final String CONNECTION_POLL_LIMIT_MESSAGE = "429 Too Many Requests\n"
99             + "{"
100             + "\"requestError\":"
101             + "{"
102             + "\"serviceException\":"
103             + "{"
104             + "\"messageId\":\"SVC2000\","
105             + "\"text\":\"Pending acquire queue has reached its maximum size\","
106             + "\"variables\":[\"429\"]"
107             + "}"
108             + "}"
109             + "}";
110
111     private final MessageRouterPublisher publisher = DmaapClientFactory
112             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
113     private final MessageRouterSubscriber subscriber = DmaapClientFactory
114             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
115
116     @BeforeAll
117     static void setUp() {
118         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
119         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
120     }
121
122     @BeforeEach
123     void set() {
124         MOCK_SERVER_CLIENT.reset();
125     }
126
127     @Test
128     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
129         //given
130         final String topic = "TOPIC";
131         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
132                 "{\"differentMessage\":\"message2\"}");
133         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
134         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
135         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
136
137         //when
138         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
139
140         //then
141         StepVerifier.create(result)
142                 .expectNext(expectedResponse)
143                 .expectComplete()
144                 .verify(TIMEOUT);
145     }
146
147     @Test
148     void publisher_shouldHandleBadRequestError() {
149         //given
150         final String topic = "TOPIC2";
151         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
152         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
153         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
154         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
155                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
156
157         //when
158         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
159
160         //then
161         StepVerifier.create(result)
162                 .expectNext(expectedResponse)
163                 .expectComplete()
164                 .verify(TIMEOUT);
165     }
166
167     @Test
168     void publisher_shouldSuccessfullyPublishSingleMessage() {
169         //given
170         final String topic = "TOPIC3";
171         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
172         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
173         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
174         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
175         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
176         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
177         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
178
179         //when
180         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
181         Mono<MessageRouterSubscribeResponse> response = publisher
182                 .put(publishRequest, jsonMessageBatch)
183                 .then(subscriber.get(subscribeRequest));
184
185         //then
186         StepVerifier.create(response)
187                 .expectNext(expectedResponse)
188                 .expectComplete()
189                 .verify();
190     }
191
192     @Test
193     void publisher_shouldSuccessfullyPublishMultipleMessages() {
194         final String topic = "TOPIC5";
195         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
196         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
197                 "{\"differentMessage\":\"message2\"}");
198         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
199         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
200         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
201         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
202         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
203
204         //when
205         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
206         Mono<MessageRouterSubscribeResponse> response = publisher
207                 .put(publishRequest, jsonMessageBatch)
208                 .then(subscriber.get(subscribeRequest));
209
210         //then
211         StepVerifier.create(response)
212                 .expectNext(expectedResponse)
213                 .expectComplete()
214                 .verify();
215     }
216
217     @Test
218     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
219         //given
220         final String topic = "TOPIC6";
221         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
222
223         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
224         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
225         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
226
227         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
228         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
229         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
230
231         //when
232         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
233         Mono<MessageRouterSubscribeResponse> response = publisher
234                 .put(publishRequest, plainBatch)
235                 .then(subscriber.get(subscribeRequest));
236
237         //then
238         StepVerifier.create(response)
239                 .expectNext(expectedResponse)
240                 .expectComplete()
241                 .verify();
242     }
243
244     @Test
245     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
246         //given
247         final String topic = "TOPIC7";
248         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
249
250         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
251         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
252         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
253
254         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
255         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
256         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
257
258         //when
259         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
260         Mono<MessageRouterSubscribeResponse> response = publisher
261                 .put(publishRequest, plainBatch)
262                 .then(subscriber.get(subscribeRequest));
263
264         //then
265         StepVerifier.create(response)
266                 .expectNext(expectedResponse)
267                 .expectComplete()
268                 .verify();
269     }
270
271     @Test
272     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
273         //given
274         final String topic = "TOPIC8";
275         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
276
277         final List<String> singlePlainMessage = List.of("kebab");
278         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
279         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
280
281         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
282         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
283         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
284
285         //when
286         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
287         Mono<MessageRouterSubscribeResponse> response = publisher
288                 .put(publishRequest, plainBatch)
289                 .then(subscriber.get(subscribeRequest));
290
291         //then
292         StepVerifier.create(response)
293                 .expectNext(expectedResponse)
294                 .expectComplete()
295                 .verify();
296     }
297
298     @Test
299     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
300         //given
301         final String topic = "TOPIC9";
302         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
303
304         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
305         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
306         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
307
308         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
309         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
310         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
311
312         //when
313         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
314         Mono<MessageRouterSubscribeResponse> response = publisher
315                 .put(publishRequest, plainBatch)
316                 .then(subscriber.get(subscribeRequest));
317
318         //then
319         StepVerifier.create(response)
320                 .expectNext(expectedResponse)
321                 .expectComplete()
322                 .verify();
323     }
324
325     @Test
326     void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
327         //given
328         final String topic = "TOPIC10";
329         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
330         final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
331         final MessageRouterPublishRequest mrRequest = createPublishRequest(
332                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
333         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
334         final String path = String.format("/events/%s", topic);
335         MOCK_SERVER_CLIENT
336                 .when(request().withPath(path), Times.once())
337                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
338
339         //when
340         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
341
342         //then
343         StepVerifier.create(result)
344                 .expectNext(expectedResponse)
345                 .expectComplete()
346                 .verify(TIMEOUT);
347
348         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
349     }
350
351     @Test
352     void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
353         //given
354         final String topic = "TOPIC11";
355         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
356
357         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
358         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
359         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
360
361         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
362         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
363
364         final String path = String.format("/events/%s", topic);
365         MOCK_SERVER_CLIENT
366                 .when(request().withPath(path), Times.once())
367                 .respond(response().withStatusCode(404));
368
369         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
370                 retryConfig(1, 1));
371
372         //when
373         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
374
375         //then
376         StepVerifier.create(result)
377                 .expectNext(expectedResponse)
378                 .expectComplete()
379                 .verify();
380
381         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
382     }
383
384     @Test
385     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
386         //given
387         final String topic = "TOPIC12";
388         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
389
390         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
391         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
392         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
393
394         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
395         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
396
397         final String path = String.format("/events/%s", topic);
398         MOCK_SERVER_CLIENT
399                 .when(request().withPath(path), Times.once())
400                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
401         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
402                 retryConfig(1, 1));
403
404         //when
405         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
406
407         //then
408         StepVerifier.create(result)
409                 .expectNext(expectedResponse)
410                 .expectComplete()
411                 .verify();
412
413         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
414     }
415
416     @Test
417     void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
418         //given
419         final String topic = "TOPIC13";
420         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
421
422         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
423                 "{\"differentMessage\":\"message2\"}");
424         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
425         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
426
427         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
428         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
429
430         final String path = String.format("/events/%s", topic);
431         MOCK_SERVER_CLIENT
432                 .when(request().withPath(path), Times.once())
433                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
434         MOCK_SERVER_CLIENT
435                 .when(request().withPath(path), Times.once())
436                 .respond(response().withStatusCode(404));
437         MOCK_SERVER_CLIENT
438                 .when(request().withPath(path), Times.once())
439                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
440         MOCK_SERVER_CLIENT
441                 .when(request().withPath(path), Times.once())
442                 .respond(response().withStatusCode(500));
443         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
444
445         //when
446         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
447
448         //then
449         StepVerifier.create(result)
450                 .expectNext(expectedResponse)
451                 .expectComplete()
452                 .verify();
453
454         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
455     }
456
457     @Test
458     void publisher_shouldHandleLastRetryError500() {
459         //given
460         final String topic = "TOPIC14";
461         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
462
463         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
464                 "{\"differentMessage\":\"message2\"}");
465         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
466
467         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
468         final String responseMessage = "Response Message";
469         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
470                 "500 Internal Server Error\n%s", responseMessage);
471
472         final String path = String.format("/events/%s", topic);
473         MOCK_SERVER_CLIENT
474                 .when(request().withPath(path), Times.once())
475                 .respond(response().withStatusCode(404));
476         MOCK_SERVER_CLIENT
477                 .when(request().withPath(path), Times.once())
478                 .respond(response().withStatusCode(500).withBody(responseMessage));
479         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1));
480
481         //when
482         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
483
484         //then
485         StepVerifier.create(result)
486                 .expectNext(expectedResponse)
487                 .expectComplete()
488                 .verify();
489
490         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
491     }
492
493     @Test
494     void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() {
495         //given
496         final String topic = "TOPIC15";
497         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
498
499         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
500         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
501         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
502
503         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
504         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
505
506         final String path = String.format("/events/%s", topic);
507         MOCK_SERVER_CLIENT
508                 .when(request().withPath(path), Times.once())
509                 .respond(response().withStatusCode(200));
510
511         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration());
512
513         //when
514         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
515
516         //then
517         StepVerifier.create(result)
518                 .expectNext(expectedResponse)
519                 .expectComplete()
520                 .verify();
521
522         MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1));
523     }
524
525     @Test
526     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() {
527         //given
528         final String topic = "TOPIC16";
529         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
530
531         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
532         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
533         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
534
535         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
536         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
537
538         final String path = String.format("/events/%s", topic);
539         MOCK_SERVER_CLIENT
540                 .when(request().withPath(path), Times.once())
541                 .respond(response().withDelay(TimeUnit.SECONDS, 10));
542
543         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolAndRetryConfiguration());
544
545         //when
546         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
547
548         //then
549         StepVerifier.create(result)
550                 .expectNext(expectedResponse)
551                 .expectComplete()
552                 .verify();
553
554         MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2));
555     }
556
557     @Test
558     void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() {
559         //given
560         final String topic = "TOPIC17";
561         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
562
563         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
564         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
565         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
566
567         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, "username","password");
568         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
569
570         final String path = String.format("/events/%s", topic);
571
572         //when
573         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
574
575         //then
576         StepVerifier.create(result)
577                 .expectNext(expectedResponse)
578                 .expectComplete()
579                 .verify();
580
581         MOCK_SERVER_CLIENT.verify(request().withPath(path)
582                 .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
583     }
584
585     @Test
586     void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() {
587         //given
588         final String topic = "TOPIC17";
589         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
590
591         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
592                 "{\"differentMessage\":\"message2\"}");
593         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
594
595         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
596
597         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
598                 CONNECTION_POLL_LIMIT_MESSAGE);
599
600         final String path = String.format("/events/%s", topic);
601
602         //maxConnectionPoll + pendingAcquireMaxCount(default 2*maxConnectionPoll)
603         final int maxNumberOfConcurrentRequest = 3;
604         MOCK_SERVER_CLIENT
605                 .when(request().withPath(path), Times.exactly(maxNumberOfConcurrentRequest))
606                 .respond(response().withStatusCode(429).withDelay(TimeUnit.SECONDS,1));
607
608         MOCK_SERVER_CLIENT
609                 .when(request().withPath(path), Times.once())
610                 .respond(response().withStatusCode(200));
611
612         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration());
613
614         //when
615         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
616
617         for(int i = 0; i < maxNumberOfConcurrentRequest; i++) {
618             publisher.put(publishRequest, plainBatch).subscribe();
619         }
620
621         //then
622         StepVerifier.create(result)
623                 .expectNext(expectedResponse)
624                 .expectComplete()
625                 .verify();
626     }
627
628     private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
629         return ImmutableMessageRouterPublisherConfig.builder()
630                 .retryConfig(ImmutableDmaapRetryConfig.builder()
631                         .retryIntervalInSeconds(retryInterval)
632                         .retryCount(retryCount)
633                         .build())
634                 .build();
635     }
636     private MessageRouterPublisherConfig connectionPoolConfiguration() {
637         return ImmutableMessageRouterPublisherConfig.builder()
638                 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
639                         .connectionPool(1)
640                         .maxIdleTime(10)
641                         .maxLifeTime(20)
642                         .build())
643                 .build();
644     }
645
646     private MessageRouterPublisherConfig connectionPoolAndRetryConfiguration() {
647         return ImmutableMessageRouterPublisherConfig.builder()
648                 .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
649                         .connectionPool(1)
650                         .maxIdleTime(10)
651                         .maxLifeTime(20)
652                         .build())
653                 .retryConfig(ImmutableDmaapRetryConfig.builder()
654                         .retryIntervalInSeconds(1)
655                         .retryCount(1)
656                         .build())
657                 .build();
658     }
659 }