db881a2e8f860aa1a331e0d37fa531aa6ec35ac2
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
22
23 import com.google.gson.JsonObject;
24 import io.vavr.collection.Stream;
25 import org.jetbrains.annotations.NotNull;
26 import org.junit.Rule;
27 import org.junit.contrib.java.lang.system.EnvironmentVariables;
28 import org.junit.jupiter.api.AfterAll;
29 import org.junit.jupiter.api.BeforeAll;
30 import org.junit.jupiter.api.Test;
31 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
32 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
33 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
34 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.EnvironmentParsingException;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
46 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
47 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration;
48 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
49 import reactor.core.publisher.Flux;
50 import reactor.core.publisher.Mono;
51 import reactor.test.StepVerifier;
52
53 import java.time.Duration;
54
55 import static org.assertj.core.api.Assertions.assertThat;
56 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
57 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
58 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
59 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
60
61 /**
62  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
63  * @since February 2019
64  */
65 class CbsClientImplIT {
66
67     private static final String SAMPLE_CONFIG = "/sample_service_config.json";
68     private static final String SAMPLE_ALL = "/sample_all.json";
69     private static final String SAMPLE_KEY = "/sample_key.json";
70     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
71     private static final String EXPECTED_CONFIG_VALUE_FROM_CBS = "/var/run/security/keystore.p12";
72     private static final String CONFIG_MAP_FILE_PATH = "src/test/resources/application_config.yaml";
73     private static CbsClientConfiguration sampleConfigurationCbsSource;
74     private static CbsClientConfiguration sampleConfigurationFileSource;
75     private static DummyHttpServer server;
76
77
78     @Rule
79     public final EnvironmentVariables envs = new EnvironmentVariables();
80
81     @BeforeAll
82     static void setUp() {
83         server = DummyHttpServer.start(routes ->
84                 routes.get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
85                         .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
86                         .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
87         );
88         ImmutableCbsClientConfiguration.Builder configBuilder = getConfigBuilder();
89         sampleConfigurationCbsSource = configBuilder.build();
90         sampleConfigurationFileSource = configBuilder.configMapFilePath(CONFIG_MAP_FILE_PATH).build();
91     }
92
93     @AfterAll
94     static void tearDown() {
95         server.close();
96     }
97
98     @Test
99     void testCbsClientWithSingleCall() {
100         // given
101         envs.set("AAF_USER", "admin");
102         envs.set("AAF_PASSWORD", "admin_secret");
103         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
104         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
105
106         // when
107         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
108
109         // then
110         StepVerifier.create(result.map(this::sampleConfigValue))
111                 .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS)
112                 .expectComplete()
113                 .verify(Duration.ofSeconds(5));
114     }
115
116     @Test
117     void testCbsClientWithPeriodicCall() {
118         // given
119         envs.set("AAF_USER", "admin");
120         envs.set("AAF_PASSWORD", "admin_secret");
121         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
122         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
123
124         // when
125         final Flux<JsonObject> result = sut
126                 .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
127
128         // then
129         final int itemsToTake = 5;
130         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
131                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE_FROM_CBS).cycle(itemsToTake))
132                 .expectComplete()
133                 .verify(Duration.ofSeconds(5));
134     }
135
136     @Test
137     void testCbsClientWithUpdatesCall() {
138         // given
139         envs.set("AAF_USER", "admin");
140         envs.set("AAF_PASSWORD", "admin_secret");
141         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
142         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
143         final Duration period = Duration.ofMillis(10);
144
145         // when
146         final Flux<JsonObject> result = sut
147                 .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
148
149         // then
150         final Duration timeToCollectItemsFor = period.multipliedBy(50);
151         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
152                 .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS)
153                 .expectComplete()
154                 .verify(Duration.ofSeconds(5));
155     }
156
157     @Test
158     void testCbsClientWithConfigRetrievedFromFileMissingEnv() {
159         // given
160         envs.set("AAF_USER", "");
161         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationFileSource);
162         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
163
164         // when
165         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
166
167         // then
168         StepVerifier.create(result)
169                 .expectErrorSatisfies(ex -> {
170                     assertThat(ex).isInstanceOf(EnvironmentParsingException.class);
171                     assertThat(ex).hasMessageContaining("Cannot read AAF_USER from environment.");
172                 })
173                 .verify(Duration.ofSeconds(5));
174     }
175
176     @Test
177     void testCbsClientWithConfigRetrievedFromFile() {
178         // given
179         envs.set("AAF_USER", "admin");
180         envs.set("AAF_PASSWORD", "admin_secret");
181         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationFileSource);
182         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
183
184         // when
185         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
186
187         // then
188         StepVerifier.create(result.map(this::sampleConfigValue))
189                 .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS)
190                 .expectComplete()
191                 .verify(Duration.ofSeconds(5));
192     }
193
194     @Test
195     void testCbsClientWithStreamsParsing() {
196         // given
197         envs.set("AAF_USER", "admin");
198         envs.set("AAF_PASSWORD", "admin_secret");
199         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
200         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
201         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
202
203         // when
204         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
205                 .map(json ->
206                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
207                 );
208
209         // then
210         StepVerifier.create(result)
211                 .consumeNextWith(kafkaSink -> {
212                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
213                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
214                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
215                 })
216                 .expectComplete()
217                 .verify(Duration.ofSeconds(5));
218     }
219
220     @Test
221     void testCbsClientWithStreamsParsingUsingSwitch() {
222         // given
223         envs.set("AAF_USER", "admin");
224         envs.set("AAF_PASSWORD", "admin_secret");
225         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
226         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
227         // TODO: Use these parsers below
228         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
229         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
230
231         // when
232         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
233                 .map(json -> {
234                     final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
235
236                     final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
237                             .map(kafkaSinkParser::unsafeParse);
238                     final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
239                             .map(mrSinkParser::unsafeParse);
240
241                     assertThat(allKafkaSinks.size())
242                             .describedAs("Number of kafka sinks")
243                             .isEqualTo(2);
244                     assertThat(allMrSinks.size())
245                             .describedAs("Number of DMAAP-MR sinks")
246                             .isEqualTo(1);
247
248                     return true;
249                 })
250                 .then();
251
252         // then
253         StepVerifier.create(result)
254                 .expectComplete()
255                 .verify(Duration.ofSeconds(5));
256     }
257
258     @Test
259     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
260         // given
261         envs.set("AAF_USER", "admin");
262         envs.set("AAF_PASSWORD", "admin_secret");
263         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
264         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
265         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
266
267         // when
268         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
269                 .map(json ->
270                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
271                 );
272
273         // then
274         StepVerifier.create(result)
275                 .expectErrorSatisfies(ex -> {
276                     assertThat(ex).isInstanceOf(StreamParsingException.class);
277                     assertThat(ex).hasMessageContaining("Invalid stream type");
278                     assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
279                     assertThat(ex).hasMessageContaining(KAFKA.toString());
280                 })
281                 .verify(Duration.ofSeconds(5));
282     }
283
284     @Test
285     void testCbsClientWithSingleAllRequest() {
286         // given
287         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
288         final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
289
290         // when
291         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
292
293         // then
294         StepVerifier.create(result)
295                 .assertNext(json -> {
296                     assertThat(json.get("config")).isNotNull();
297                     assertThat(json.get("policies")).isNotNull();
298                     assertThat(json.get("sampleKey")).isNotNull();
299                 })
300                 .expectComplete()
301                 .verify(Duration.ofSeconds(5));
302     }
303
304
305     @Test
306     void testCbsClientWithSingleKeyRequest() {
307         // given
308         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
309         final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
310
311         // when
312         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
313
314         // then
315         StepVerifier.create(result)
316                 .assertNext(json -> {
317                     assertThat(json.get("key")).isNotNull();
318                     assertThat(json.get("key").getAsString()).isEqualTo("value");
319                 })
320                 .expectComplete()
321                 .verify(Duration.ofSeconds(5));
322     }
323
324     @Test
325     void testCbsClientWhenTheConfigurationWasNotFound() {
326         // given
327         final CbsClientConfiguration unknownAppEnv = ImmutableCbsClientConfiguration.copyOf(sampleConfigurationCbsSource).withAppName("unknown_app");
328         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(unknownAppEnv);
329         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
330
331         // when
332         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
333
334         // then
335         StepVerifier.create(result)
336                 .expectError(HttpException.class)
337                 .verify(Duration.ofSeconds(5));
338     }
339
340     @NotNull
341     private static ImmutableCbsClientConfiguration.Builder getConfigBuilder() {
342         return ImmutableCbsClientConfiguration.builder()
343                 .protocol("http")
344                 .appName("dcae-component")
345                 .hostname(server.host())
346                 .port(server.port());
347     }
348
349     private String sampleConfigValue(JsonObject obj) {
350         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
351     }
352
353 }