f0138d2cc57f5d055967133b03ac90cd1f1b45fa
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.BDDMockito.given;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29
30 import com.google.gson.Gson;
31 import com.google.gson.JsonArray;
32 import com.google.gson.JsonObject;
33 import com.google.gson.JsonPrimitive;
34 import io.netty.buffer.ByteBuf;
35 import io.netty.buffer.ByteBufAllocator;
36 import io.netty.buffer.CompositeByteBuf;
37 import java.nio.charset.StandardCharsets;
38 import java.time.Duration;
39 import java.util.List;
40 import org.junit.jupiter.api.Test;
41 import org.mockito.ArgumentCaptor;
42 import org.mockito.verification.VerificationMode;
43 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
44 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
45 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
46 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
47 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
48 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
49 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
50 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
51 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
52 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
53 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
54 import reactor.core.publisher.Flux;
55 import reactor.core.publisher.Mono;
56 import reactor.test.StepVerifier;
57
58 /**
59  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
60  * @since April 2019
61  */
62 class MessageRouterPublisherImplTest {
63
64     private static final Duration TIMEOUT = Duration.ofSeconds(5);
65     private final RxHttpClient httpClient = mock(RxHttpClient.class);
66     private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
67
68     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
69     private final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
70             .name("the topic")
71             .topicUrl("https://dmaap-mr/TOPIC")
72             .build();
73     private final MessageRouterPublishRequest mrRequest = ImmutableMessageRouterPublishRequest.builder()
74             .sinkDefinition(sinkDefinition)
75             .build();
76     private final HttpResponse httpResponse = ImmutableHttpResponse.builder()
77             .statusCode(200)
78             .statusReason("OK")
79             .url(sinkDefinition.topicUrl())
80             .rawBody("[]".getBytes())
81             .build();
82
83     @Test
84     void puttingElementsShouldYieldNonChunkedHttpRequest() {
85         // given
86         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
87
88         // when
89         final Flux<MessageRouterPublishResponse> responses = cut
90                 .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
91         responses.then().block();
92
93         // then
94         verify(httpClient).call(httpRequestArgumentCaptor.capture());
95         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
96         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
97         assertThat(httpRequest.url()).isEqualTo(sinkDefinition.topicUrl());
98         assertThat(httpRequest.body()).isNotNull();
99         assertThat(httpRequest.body().length()).isGreaterThan(0);
100     }
101
102     @Test
103     void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() {
104         // given
105         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
106
107         // when
108         final Flux<MessageRouterPublishResponse> responses = cut
109                 .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
110         responses.then().block();
111
112         // then
113         verify(httpClient).call(httpRequestArgumentCaptor.capture());
114         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
115         final JsonArray elementsInRequest = extractNonEmptyRequestBody(httpRequest);
116         assertThat(elementsInRequest.size()).isEqualTo(3);
117         assertThat(elementsInRequest.get(0).getAsString()).isEqualTo("I");
118         assertThat(elementsInRequest.get(1).getAsString()).isEqualTo("like");
119         assertThat(elementsInRequest.get(2).getAsString()).isEqualTo("cookies");
120     }
121
122     @Test
123     void puttingLowNumberOfElementsShouldReturnSingleResponse() {
124         // given
125         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
126
127         // when
128         final Flux<MessageRouterPublishResponse> responses = cut
129                 .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
130
131         // then
132         StepVerifier.create(responses)
133                 .consumeNextWith(response -> {
134                     assertThat(response.successful()).describedAs("successful").isTrue();
135                     assertThat(response.items()).containsExactly(
136                             new JsonPrimitive("I"),
137                             new JsonPrimitive("like"),
138                             new JsonPrimitive("cookies"));
139                 })
140                 .expectComplete()
141                 .verify(TIMEOUT);
142     }
143
144
145     @Test
146     void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() {
147         // given
148         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
149
150         // when
151         final Flux<MessageRouterPublishResponse> responses = cut
152                 .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
153
154         // then
155         responses.then().block();
156
157         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
158         final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues();
159         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
160
161         final JsonArray firstRequest = extractNonEmptyRequestBody(httpRequests.get(0));
162         assertThat(firstRequest.size()).isEqualTo(3);
163         assertThat(firstRequest.get(0).getAsString()).isEqualTo("I");
164         assertThat(firstRequest.get(1).getAsString()).isEqualTo("like");
165         assertThat(firstRequest.get(2).getAsString()).isEqualTo("cookies");
166
167         final JsonArray secondRequest = extractNonEmptyRequestBody(httpRequests.get(1));
168         assertThat(secondRequest.size()).isEqualTo(2);
169         assertThat(secondRequest.get(0).getAsString()).isEqualTo("and");
170         assertThat(secondRequest.get(1).getAsString()).isEqualTo("pierogi");
171     }
172
173     @Test
174     void puttingHighNumberOfElementsShouldReturnMoreResponses() {
175         // given
176         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
177
178         // when
179         final Flux<MessageRouterPublishResponse> responses = cut
180                 .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
181
182         // then
183         StepVerifier.create(responses)
184                 .consumeNextWith(response -> {
185                     assertThat(response.successful()).describedAs("successful").isTrue();
186                     assertThat(response.items()).containsExactly(
187                             new JsonPrimitive("I"),
188                             new JsonPrimitive("like"),
189                             new JsonPrimitive("cookies"));
190                 })
191                 .consumeNextWith(response -> {
192                     assertThat(response.successful()).describedAs("successful").isTrue();
193                     assertThat(response.items()).containsExactly(
194                             new JsonPrimitive("and"),
195                             new JsonPrimitive("pierogi"));
196                 })
197                 .expectComplete()
198                 .verify(TIMEOUT);
199     }
200
201     private JsonArray extractNonEmptyRequestBody(HttpRequest httpRequest) {
202         final String body = Flux.from(httpRequest.body().contents())
203                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
204                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
205                 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
206                 .block();
207         assertThat(body).describedAs("request body").isNotBlank();
208         return new Gson().fromJson(body, JsonArray.class);
209     }
210 }