2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.BDDMockito.given;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
31 import com.google.gson.JsonArray;
32 import com.google.gson.JsonElement;
33 import com.google.gson.JsonObject;
34 import com.google.gson.Gson;
35 import com.google.gson.JsonPrimitive;
36 import io.netty.buffer.ByteBufAllocator;
37 import io.netty.handler.codec.http.HttpHeaderValues;
38 import io.vavr.collection.List;
39 import java.nio.charset.StandardCharsets;
40 import java.time.Duration;
41 import org.junit.jupiter.api.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
44 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
45 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
46 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
47 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
48 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
49 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
50 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
51 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
52 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55 import reactor.test.StepVerifier;
58 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
61 class MessageRouterPublisherImplTest {
62 private static final Duration TIMEOUT = Duration.ofSeconds(5);
63 private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
64 private static final int MAX_BATCH_SIZE = 3;
65 private final RxHttpClient httpClient = mock(RxHttpClient.class);
66 private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
67 private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
68 private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
69 private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
70 private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
73 void puttingElementsShouldYieldNonChunkedHttpRequest() {
75 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
76 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
77 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
80 final Flux<MessageRouterPublishResponse> responses = cut
81 .put(jsonPublishRequest, singleJsonMessageBatch);
82 responses.then().block();
85 verify(httpClient).call(httpRequestArgumentCaptor.capture());
86 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
87 assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
88 assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
89 assertThat(httpRequest.body()).isNotNull();
90 assertThat(httpRequest.body().length()).isGreaterThan(0);
94 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
96 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
97 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
99 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
100 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
103 final Flux<MessageRouterPublishResponse> responses = cut
104 .put(jsonPublishRequest, jsonMessagesMaxBatch);
105 responses.then().block();
108 verify(httpClient).call(httpRequestArgumentCaptor.capture());
109 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
110 final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
112 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
113 .isEqualTo(MAX_BATCH_SIZE);
114 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
120 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
122 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
123 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
125 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
126 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
129 final Flux<MessageRouterPublishResponse> responses = cut
130 .put(plainPublishRequest, plainMessagesMaxBatch);
131 responses.then().block();
134 verify(httpClient).call(httpRequestArgumentCaptor.capture());
135 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
136 final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
137 .map(JsonElement::getAsJsonObject);
140 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
141 .isEqualTo(MAX_BATCH_SIZE);
142 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
146 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
148 final List<String> threePlainMessages = List.of("I", "like", "cookies");
149 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
151 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
152 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
155 final Flux<MessageRouterPublishResponse> responses = cut
156 .put(plainPublishRequest, plainMessagesMaxBatch);
157 responses.then().block();
160 verify(httpClient).call(httpRequestArgumentCaptor.capture());
161 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
162 final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
163 .map(JsonElement::getAsJsonPrimitive);
165 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
166 .isEqualTo(MAX_BATCH_SIZE);
167 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
171 void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
173 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
174 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
175 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
178 final Flux<MessageRouterPublishResponse> responses = cut
179 .put(jsonPublishRequest, singleJsonMessageBatch);
180 responses.then().block();
183 verify(httpClient).call(httpRequestArgumentCaptor.capture());
184 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
185 assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
186 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
190 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
192 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
193 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
195 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
196 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
199 final Flux<MessageRouterPublishResponse> responses = cut
200 .put(jsonPublishRequest, jsonMessagesMaxBatch);
203 verifySingleResponse(parsedThreeMessages, responses);
207 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
209 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
210 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
212 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
213 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
216 final Flux<MessageRouterPublishResponse> responses = cut
217 .put(plainPublishRequest, plainMessagesMaxBatch);
220 verifySingleResponse(parsedThreeMessages, responses);
224 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
226 final List<String> threePlainMessages = List.of("I", "like", "cookies");
227 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
229 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
230 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
233 final Flux<MessageRouterPublishResponse> responses = cut
234 .put(plainPublishRequest, plainMessagesMaxBatch);
237 verifySingleResponse(parsedThreeMessages, responses);
241 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
243 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
244 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
246 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
247 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
249 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
250 threeJsonMessages.appendAll(twoJsonMessages));
251 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
254 final Flux<MessageRouterPublishResponse> responses = cut
255 .put(jsonPublishRequest, doubleJsonMessageBatch);
257 responses.then().block();
259 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
260 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
261 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
263 final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
264 assertThat(firstRequest.size()).describedAs("Http request first batch size")
265 .isEqualTo(MAX_BATCH_SIZE);
266 assertListsContainSameElements(firstRequest, parsedThreeMessages);
268 final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
269 assertThat(secondRequest.size()).describedAs("Http request second batch size")
270 .isEqualTo(MAX_BATCH_SIZE-1);
271 assertListsContainSameElements(secondRequest, parsedTwoMessages);
275 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
277 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
278 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
280 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
281 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
283 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
284 threeJsonMessages.appendAll(twoJsonMessages));
285 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
288 final Flux<MessageRouterPublishResponse> responses = cut
289 .put(plainPublishRequest, doublePlainMessageBatch);
291 responses.then().block();
293 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
294 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
295 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
297 final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
298 .map(JsonElement::getAsJsonObject);
299 assertThat(firstRequest.size()).describedAs("Http request first batch size")
300 .isEqualTo(MAX_BATCH_SIZE);
301 assertListsContainSameElements(firstRequest, parsedThreeMessages);
303 final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
304 .map(JsonElement::getAsJsonObject);
305 assertThat(secondRequest.size()).describedAs("Http request second batch size")
306 .isEqualTo(MAX_BATCH_SIZE-1);
307 assertListsContainSameElements(secondRequest, parsedTwoMessages);
311 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
313 final List<String> threePlainMessages = List.of("I", "like", "cookies");
314 final List<String> twoPlainMessages = List.of("and", "pierogi");
316 final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
317 final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
319 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
320 threePlainMessages.appendAll(twoPlainMessages));
321 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
324 final Flux<MessageRouterPublishResponse> responses = cut
325 .put(plainPublishRequest, doublePlainMessageBatch);
327 responses.then().block();
329 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
330 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
331 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
333 final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
334 .map(JsonElement::getAsJsonPrimitive);
335 assertThat(firstRequest.size()).describedAs("Http request first batch size")
336 .isEqualTo(MAX_BATCH_SIZE);
337 assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
339 final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
340 .map(JsonElement::getAsJsonPrimitive);
341 assertThat(secondRequest.size()).describedAs("Http request second batch size")
342 .isEqualTo(MAX_BATCH_SIZE-1);
343 assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
347 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
349 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
350 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
352 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
353 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
355 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
356 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
359 final Flux<MessageRouterPublishResponse> responses = cut
360 .put(jsonPublishRequest, doubleJsonMessageBatch);
363 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
367 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
369 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
370 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
372 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
373 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
375 final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
376 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
379 final Flux<MessageRouterPublishResponse> responses = cut
380 .put(plainPublishRequest, doubleJsonMessageBatch);
383 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
387 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
389 final List<String> threePlainMessages = List.of("I", "like", "cookies");
390 final List<String> twoPlainMessages = List.of("and", "pierogi");
392 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
393 final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
395 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
396 threePlainMessages.appendAll(twoPlainMessages));
397 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
400 final Flux<MessageRouterPublishResponse> responses = cut
401 .put(plainPublishRequest, doublePlainMessageBatch);
404 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
407 private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
408 return plainTextMessages
409 .map(message -> String.format("{\"message\":\"%s\"}", message));
412 private static HttpResponse createHttpResponse(String statusReason, int statusCode){
413 return ImmutableHttpResponse.builder()
414 .statusCode(statusCode)
416 .statusReason(statusReason)
417 .rawBody("[]".getBytes())
421 private String collectNonEmptyRequestBody(HttpRequest httpRequest){
422 final String body = Flux.from(httpRequest.body().contents())
423 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
424 (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
425 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
427 assertThat(body).describedAs("request body").isNotBlank();
432 private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){
433 return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
436 private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){
437 return getAsJsonElements(
439 collectNonEmptyRequestBody(httpRequest)
445 private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
446 List<? extends JsonElement> expectedMessages){
447 for (int i = 0; i < actualMessages.size(); i++) {
448 assertThat(actualMessages.get(i))
449 .describedAs(String.format("Http request element at position %d", i))
450 .isEqualTo(expectedMessages.get(i));
454 private void assertListsContainSameElements(JsonArray actualMessages,
455 List<? extends JsonElement> expectedMessages){
456 assertThat(actualMessages.size()).describedAs("Http request batch size")
457 .isEqualTo(expectedMessages.size());
459 for (int i = 0; i < actualMessages.size(); i++) {
460 assertThat(actualMessages.get(i))
461 .describedAs(String.format("Http request element at position %d", i))
462 .isEqualTo(expectedMessages.get(i));
466 private void verifySingleResponse(List<? extends JsonElement> threeMessages,
467 Flux<MessageRouterPublishResponse> responses) {
468 StepVerifier.create(responses)
469 .consumeNextWith(response -> {
470 assertThat(response.successful()).describedAs("successful").isTrue();
471 assertThat(response.items()).containsExactly(
472 threeMessages.get(0),
473 threeMessages.get(1),
474 threeMessages.get(2));
480 private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
481 List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
483 StepVerifier.create(responses)
484 .consumeNextWith(response -> {
485 assertThat(response.successful()).describedAs("successful").isTrue();
486 assertThat(response.items()).containsExactly(
487 threeMessages.get(0),
488 threeMessages.get(1),
489 threeMessages.get(2));
491 .consumeNextWith(response -> {
492 assertThat(response.successful()).describedAs("successful").isTrue();
493 assertThat(response.items()).containsExactly(