2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.BDDMockito.given;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
30 import com.google.gson.Gson;
31 import com.google.gson.JsonArray;
32 import com.google.gson.JsonObject;
33 import com.google.gson.JsonPrimitive;
34 import io.netty.buffer.ByteBuf;
35 import io.netty.buffer.ByteBufAllocator;
36 import io.netty.buffer.CompositeByteBuf;
37 import java.nio.charset.StandardCharsets;
38 import java.time.Duration;
39 import java.util.List;
40 import org.junit.jupiter.api.Test;
41 import org.mockito.ArgumentCaptor;
42 import org.mockito.verification.VerificationMode;
43 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
44 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
45 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
46 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
47 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
48 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
49 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
50 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
51 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
52 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
53 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
54 import reactor.core.publisher.Flux;
55 import reactor.core.publisher.Mono;
56 import reactor.test.StepVerifier;
59 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
62 class MessageRouterPublisherImplTest {
64 private static final Duration TIMEOUT = Duration.ofSeconds(5);
65 private final RxHttpClient httpClient = mock(RxHttpClient.class);
66 private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
68 private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
69 private final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
71 .topicUrl("https://dmaap-mr/TOPIC")
73 private final MessageRouterPublishRequest mrRequest = ImmutableMessageRouterPublishRequest.builder()
74 .sinkDefinition(sinkDefinition)
76 private final HttpResponse httpResponse = ImmutableHttpResponse.builder()
79 .url(sinkDefinition.topicUrl())
80 .rawBody("{}".getBytes())
84 void puttingElementsShouldYieldNonChunkedHttpRequest() {
86 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
89 final Flux<MessageRouterPublishResponse> responses = cut
90 .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
91 responses.then().block();
94 verify(httpClient).call(httpRequestArgumentCaptor.capture());
95 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
96 assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
97 assertThat(httpRequest.url()).isEqualTo(sinkDefinition.topicUrl());
98 assertThat(httpRequest.body()).isNotNull();
99 assertThat(httpRequest.body().length()).isGreaterThan(0);
103 void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() {
105 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
108 final Flux<MessageRouterPublishResponse> responses = cut
109 .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
110 responses.then().block();
113 verify(httpClient).call(httpRequestArgumentCaptor.capture());
114 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
115 final JsonArray elementsInRequest = extractNonEmptyRequestBody(httpRequest);
116 assertThat(elementsInRequest.size()).isEqualTo(3);
117 assertThat(elementsInRequest.get(0).getAsString()).isEqualTo("I");
118 assertThat(elementsInRequest.get(1).getAsString()).isEqualTo("like");
119 assertThat(elementsInRequest.get(2).getAsString()).isEqualTo("cookies");
123 void puttingLowNumberOfElementsShouldReturnSingleResponse() {
125 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
128 final Flux<MessageRouterPublishResponse> responses = cut
129 .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
132 StepVerifier.create(responses)
133 .consumeNextWith(response -> {
134 assertThat(response.successful()).describedAs("successful").isTrue();
135 assertThat(response.items()).containsExactly(
136 new JsonPrimitive("I"),
137 new JsonPrimitive("like"),
138 new JsonPrimitive("cookies"));
146 void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() {
148 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
151 final Flux<MessageRouterPublishResponse> responses = cut
152 .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
155 responses.then().block();
157 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
158 final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues();
159 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
161 final JsonArray firstRequest = extractNonEmptyRequestBody(httpRequests.get(0));
162 assertThat(firstRequest.size()).isEqualTo(3);
163 assertThat(firstRequest.get(0).getAsString()).isEqualTo("I");
164 assertThat(firstRequest.get(1).getAsString()).isEqualTo("like");
165 assertThat(firstRequest.get(2).getAsString()).isEqualTo("cookies");
167 final JsonArray secondRequest = extractNonEmptyRequestBody(httpRequests.get(1));
168 assertThat(secondRequest.size()).isEqualTo(2);
169 assertThat(secondRequest.get(0).getAsString()).isEqualTo("and");
170 assertThat(secondRequest.get(1).getAsString()).isEqualTo("pierogi");
174 void puttingHighNumberOfElementsShouldReturnMoreResponses() {
176 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
179 final Flux<MessageRouterPublishResponse> responses = cut
180 .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
183 StepVerifier.create(responses)
184 .consumeNextWith(response -> {
185 assertThat(response.successful()).describedAs("successful").isTrue();
186 assertThat(response.items()).containsExactly(
187 new JsonPrimitive("I"),
188 new JsonPrimitive("like"),
189 new JsonPrimitive("cookies"));
191 .consumeNextWith(response -> {
192 assertThat(response.successful()).describedAs("successful").isTrue();
193 assertThat(response.items()).containsExactly(
194 new JsonPrimitive("and"),
195 new JsonPrimitive("pierogi"));
201 private JsonArray extractNonEmptyRequestBody(HttpRequest httpRequest) {
202 final String body = Flux.from(httpRequest.body().contents())
203 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
204 (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
205 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
207 assertThat(body).describedAs("request body").isNotBlank();
208 return new Gson().fromJson(body, JsonArray.class);