a51b87aae43588d71f35b10aa3b46fa5d0b3ae33
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
25 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
26
27 import com.google.gson.JsonObject;
28 import io.vavr.collection.Map;
29 import io.vavr.collection.Stream;
30 import java.time.Duration;
31 import org.junit.jupiter.api.AfterAll;
32 import org.junit.jupiter.api.BeforeAll;
33 import org.junit.jupiter.api.Test;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
46 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
47 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
48 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
49 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
50 import reactor.core.publisher.Flux;
51 import reactor.core.publisher.Mono;
52 import reactor.test.StepVerifier;
53
54 /**
55  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
56  * @since February 2019
57  */
58 class CbsClientImplIT {
59
60     private static final String CONSUL_RESPONSE = "[\n"
61             + "    {\n"
62             + "        \"ServiceAddress\": \"HOST\",\n"
63             + "        \"ServiceName\": \"the_cbs\",\n"
64             + "        \"ServicePort\": PORT\n"
65             + "    }\n"
66             + "]\n";
67     private static final String SAMPLE_CONFIG = "/sample_service_config.json";
68     private static final String SAMPLE_ALL = "/sample_all.json";
69     private static final String SAMPLE_KEY = "/sample_key.json";
70     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
71     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
72     private static EnvProperties sampleEnvironment;
73     private static DummyHttpServer server;
74
75     @BeforeAll
76     static void setUp() {
77         server = DummyHttpServer.start(routes ->
78                 routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse()))
79                         .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
80                         .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
81                         .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
82         );
83         sampleEnvironment = ImmutableEnvProperties.builder()
84                 .appName("dcae-component")
85                 .cbsName("the_cbs")
86                 .consulHost(server.host())
87                 .consulPort(server.port())
88                 .build();
89     }
90
91     @AfterAll
92     static void tearDown() {
93         server.close();
94     }
95
96     @Test
97     void testCbsClientWithSingleCall() {
98         // given
99         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
100         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
101
102         // when
103         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
104
105         // then
106         StepVerifier.create(result.map(this::sampleConfigValue))
107                 .expectNext(EXPECTED_CONFIG_VALUE)
108                 .expectComplete()
109                 .verify(Duration.ofSeconds(5));
110     }
111
112     @Test
113     void testCbsClientWithPeriodicCall() {
114         // given
115         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
116         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
117
118         // when
119         final Flux<JsonObject> result = sut
120                 .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
121
122         // then
123         final int itemsToTake = 5;
124         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
125                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE).cycle(itemsToTake))
126                 .expectComplete()
127                 .verify(Duration.ofSeconds(5));
128     }
129
130     @Test
131     void testCbsClientWithUpdatesCall() {
132         // given
133         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
134         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
135         final Duration period = Duration.ofMillis(10);
136
137         // when
138         final Flux<JsonObject> result = sut
139                 .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
140
141         // then
142         final Duration timeToCollectItemsFor = period.multipliedBy(50);
143         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
144                 .expectNext(EXPECTED_CONFIG_VALUE)
145                 .expectComplete()
146                 .verify(Duration.ofSeconds(5));
147     }
148
149     @Test
150     void testCbsClientWithStreamsParsing() {
151         // given
152         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
153         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
154         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
155
156         // when
157         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
158                 .map(json ->
159                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
160                 );
161
162         // then
163         StepVerifier.create(result)
164                 .consumeNextWith(kafkaSink -> {
165                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
166                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
167                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
168                 })
169                 .expectComplete()
170                 .verify(Duration.ofSeconds(5));
171     }
172
173     @Test
174     void testCbsClientWithStreamsParsingUsingSwitch() {
175         // given
176         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
177         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
178         // TODO: Use these parsers below
179         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
180         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
181
182         // when
183         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
184                 .map(json -> {
185                     final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
186                             .groupBy(RawDataStream::type);
187
188                     final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
189                             .map(kafkaSinkParser::unsafeParse);
190                     final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
191                             .map(mrSinkParser::unsafeParse);
192
193                     assertThat(allKafkaSinks.size())
194                             .describedAs("Number of kafka sinks")
195                             .isEqualTo(2);
196                     assertThat(allMrSinks.size())
197                             .describedAs("Number of DMAAP-MR sinks")
198                             .isEqualTo(1);
199
200                     return true;
201                 })
202                 .then();
203
204         // then
205         StepVerifier.create(result)
206                 .expectComplete()
207                 .verify(Duration.ofSeconds(5));
208     }
209
210     @Test
211     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
212         // given
213         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
214         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
215         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
216
217         // when
218         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
219                 .map(json ->
220                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
221                 );
222
223         // then
224         StepVerifier.create(result)
225                 .expectErrorSatisfies(ex -> {
226                     assertThat(ex).isInstanceOf(StreamParsingException.class);
227                     assertThat(ex).hasMessageContaining("Invalid stream type");
228                     assertThat(ex).hasMessageContaining("message_router");
229                     assertThat(ex).hasMessageContaining("kafka");
230                 })
231                 .verify(Duration.ofSeconds(5));
232     }
233
234     @Test
235     void testCbsClientWithSingleAllRequest() {
236         // given
237         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
238         final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
239
240         // when
241         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
242
243         // then
244         StepVerifier.create(result)
245                 .assertNext(json -> {
246                     assertThat(json.get("config")).isNotNull();
247                     assertThat(json.get("policies")).isNotNull();
248                     assertThat(json.get("sampleKey")).isNotNull();
249                 })
250                 .expectComplete()
251                 .verify(Duration.ofSeconds(5));
252     }
253
254
255     @Test
256     void testCbsClientWithSingleKeyRequest() {
257         // given
258         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
259         final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
260
261         // when
262         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
263
264         // then
265         StepVerifier.create(result)
266                 .assertNext(json -> {
267                     assertThat(json.get("key")).isNotNull();
268                     assertThat(json.get("key").getAsString()).isEqualTo("value");
269                 })
270                 .expectComplete()
271                 .verify(Duration.ofSeconds(5));
272     }
273
274     private String sampleConfigValue(JsonObject obj) {
275         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
276     }
277
278     private static Mono<String> lazyConsulResponse() {
279         return Mono.just(CONSUL_RESPONSE)
280                 .map(CbsClientImplIT::processConsulResponseTemplate);
281     }
282
283     private static String processConsulResponseTemplate(String resp) {
284         return resp.replaceAll("HOST", server.host())
285                 .replaceAll("PORT", Integer.toString(server.port()));
286     }
287 }