43b2a7bbfa7f0247cf8499e8e847b849de415ad3
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
25 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
26 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
27 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
28
29 import com.google.gson.JsonObject;
30 import io.vavr.collection.Stream;
31
32 import java.time.Duration;
33
34 import org.junit.jupiter.api.AfterAll;
35 import org.junit.jupiter.api.BeforeAll;
36 import org.junit.jupiter.api.Test;
37 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
38 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
39 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
40 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
41 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
42 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
46 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
47 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
48 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
49 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
50 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
51 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
52 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration;
53 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
54 import reactor.core.publisher.Flux;
55 import reactor.core.publisher.Mono;
56 import reactor.test.StepVerifier;
57
58 /**
59  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
60  * @since February 2019
61  */
62 class CbsClientImplIT {
63
64     private static final String SAMPLE_CONFIG = "/sample_service_config.json";
65     private static final String SAMPLE_ALL = "/sample_all.json";
66     private static final String SAMPLE_KEY = "/sample_key.json";
67     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
68     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
69     private static CbsClientConfiguration sampleConfiguration;
70     private static DummyHttpServer server;
71
72     @BeforeAll
73     static void setUp() {
74         server = DummyHttpServer.start(routes ->
75                 routes.get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
76                         .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
77                         .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
78         );
79         sampleConfiguration = ImmutableCbsClientConfiguration.builder()
80                 .appName("dcae-component")
81                 .hostname(server.host())
82                 .port(server.port())
83                 .build();
84     }
85
86     @AfterAll
87     static void tearDown() {
88         server.close();
89     }
90
91     @Test
92     void testCbsClientWithSingleCall() {
93         // given
94         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
95         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
96
97         // when
98         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
99
100         // then
101         StepVerifier.create(result.map(this::sampleConfigValue))
102                 .expectNext(EXPECTED_CONFIG_VALUE)
103                 .expectComplete()
104                 .verify(Duration.ofSeconds(5));
105     }
106
107     @Test
108     void testCbsClientWithPeriodicCall() {
109         // given
110         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
111         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
112
113         // when
114         final Flux<JsonObject> result = sut
115                 .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
116
117         // then
118         final int itemsToTake = 5;
119         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
120                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE).cycle(itemsToTake))
121                 .expectComplete()
122                 .verify(Duration.ofSeconds(5));
123     }
124
125     @Test
126     void testCbsClientWithUpdatesCall() {
127         // given
128         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
129         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
130         final Duration period = Duration.ofMillis(10);
131
132         // when
133         final Flux<JsonObject> result = sut
134                 .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
135
136         // then
137         final Duration timeToCollectItemsFor = period.multipliedBy(50);
138         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
139                 .expectNext(EXPECTED_CONFIG_VALUE)
140                 .expectComplete()
141                 .verify(Duration.ofSeconds(5));
142     }
143
144     @Test
145     void testCbsClientWithStreamsParsing() {
146         // given
147         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
148         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
149         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
150
151         // when
152         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
153                 .map(json ->
154                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
155                 );
156
157         // then
158         StepVerifier.create(result)
159                 .consumeNextWith(kafkaSink -> {
160                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
161                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
162                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
163                 })
164                 .expectComplete()
165                 .verify(Duration.ofSeconds(5));
166     }
167
168     @Test
169     void testCbsClientWithStreamsParsingUsingSwitch() {
170         // given
171         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
172         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
173         // TODO: Use these parsers below
174         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
175         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
176
177         // when
178         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
179                 .map(json -> {
180                     final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
181
182                     final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
183                             .map(kafkaSinkParser::unsafeParse);
184                     final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
185                             .map(mrSinkParser::unsafeParse);
186
187                     assertThat(allKafkaSinks.size())
188                             .describedAs("Number of kafka sinks")
189                             .isEqualTo(2);
190                     assertThat(allMrSinks.size())
191                             .describedAs("Number of DMAAP-MR sinks")
192                             .isEqualTo(1);
193
194                     return true;
195                 })
196                 .then();
197
198         // then
199         StepVerifier.create(result)
200                 .expectComplete()
201                 .verify(Duration.ofSeconds(5));
202     }
203
204     @Test
205     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
206         // given
207         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
208         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
209         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
210
211         // when
212         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
213                 .map(json ->
214                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
215                 );
216
217         // then
218         StepVerifier.create(result)
219                 .expectErrorSatisfies(ex -> {
220                     assertThat(ex).isInstanceOf(StreamParsingException.class);
221                     assertThat(ex).hasMessageContaining("Invalid stream type");
222                     assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
223                     assertThat(ex).hasMessageContaining(KAFKA.toString());
224                 })
225                 .verify(Duration.ofSeconds(5));
226     }
227
228     @Test
229     void testCbsClientWithSingleAllRequest() {
230         // given
231         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
232         final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
233
234         // when
235         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
236
237         // then
238         StepVerifier.create(result)
239                 .assertNext(json -> {
240                     assertThat(json.get("config")).isNotNull();
241                     assertThat(json.get("policies")).isNotNull();
242                     assertThat(json.get("sampleKey")).isNotNull();
243                 })
244                 .expectComplete()
245                 .verify(Duration.ofSeconds(5));
246     }
247
248
249     @Test
250     void testCbsClientWithSingleKeyRequest() {
251         // given
252         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
253         final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
254
255         // when
256         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
257
258         // then
259         StepVerifier.create(result)
260                 .assertNext(json -> {
261                     assertThat(json.get("key")).isNotNull();
262                     assertThat(json.get("key").getAsString()).isEqualTo("value");
263                 })
264                 .expectComplete()
265                 .verify(Duration.ofSeconds(5));
266     }
267
268     @Test
269     void testCbsClientWhenTheConfigurationWasNotFound() {
270         // given
271         final CbsClientConfiguration unknownAppEnv = ImmutableCbsClientConfiguration.copyOf(sampleConfiguration).withAppName("unknown_app");
272         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(unknownAppEnv);
273         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
274
275         // when
276         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
277
278         // then
279         StepVerifier.create(result)
280                 .expectError(HttpException.class)
281                 .verify(Duration.ofSeconds(5));
282     }
283
284     private String sampleConfigValue(JsonObject obj) {
285         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
286     }
287
288 }