e2833fe5e0e35f9346b884869b80ee19968864d2
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource;
25 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString;
26
27 import com.google.gson.JsonObject;
28 import io.vavr.collection.Map;
29 import io.vavr.collection.Stream;
30 import java.time.Duration;
31 import org.junit.jupiter.api.AfterAll;
32 import org.junit.jupiter.api.BeforeAll;
33 import org.junit.jupiter.api.Test;
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
45 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
46 import reactor.core.publisher.Flux;
47 import reactor.core.publisher.Mono;
48 import reactor.test.StepVerifier;
49
50 /**
51  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
52  * @since February 2019
53  */
54 class CbsClientImplIT {
55
56     private static final String CONSUL_RESPONSE = "[\n"
57             + "    {\n"
58             + "        \"ServiceAddress\": \"HOST\",\n"
59             + "        \"ServiceName\": \"the_cbs\",\n"
60             + "        \"ServicePort\": PORT\n"
61             + "    }\n"
62             + "]\n";
63     private static final String SAMPLE_CONFIG = "/sample_config.json";
64     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
65     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
66     private static EnvProperties sampleEnvironment;
67     private static DummyHttpServer server;
68
69     @BeforeAll
70     static void setUp() {
71         server = DummyHttpServer.start(routes ->
72                 routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse()))
73                         .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG)));
74         sampleEnvironment = ImmutableEnvProperties.builder()
75                 .appName("dcae-component")
76                 .cbsName("the_cbs")
77                 .consulHost(server.host())
78                 .consulPort(server.port())
79                 .build();
80     }
81
82     @AfterAll
83     static void tearDown() {
84         server.close();
85     }
86
87     @Test
88     void testCbsClientWithSingleCall() {
89         // given
90         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
91         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
92
93         // when
94         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext));
95
96         // then
97         StepVerifier.create(result.map(this::sampleConfigValue))
98                 .expectNext(EXPECTED_CONFIG_VALUE)
99                 .expectComplete()
100                 .verify(Duration.ofSeconds(5));
101     }
102
103     @Test
104     void testCbsClientWithPeriodicCall() {
105         // given
106         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
107         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
108
109         // when
110         final Flux<JsonObject> result = sut
111                 .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
112
113         // then
114         final int itemsToTake = 5;
115         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
116                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE).cycle(itemsToTake))
117                 .expectComplete()
118                 .verify(Duration.ofSeconds(5));
119     }
120
121     @Test
122     void testCbsClientWithUpdatesCall() {
123         // given
124         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
125         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
126         final Duration period = Duration.ofMillis(10);
127
128         // when
129         final Flux<JsonObject> result = sut
130                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period));
131
132         // then
133         final Duration timeToCollectItemsFor = period.multipliedBy(50);
134         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
135                 .expectNext(EXPECTED_CONFIG_VALUE)
136                 .expectComplete()
137                 .verify(Duration.ofSeconds(5));
138     }
139
140     @Test
141     void testCbsClientWithStreamsParsing() {
142         // given
143         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
144         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
145         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
146
147         // when
148         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
149                 .map(json ->
150                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
151                 );
152
153         // then
154         StepVerifier.create(result)
155                 .consumeNextWith(kafkaSink -> {
156                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
157                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
158                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
159                 })
160                 .expectComplete()
161                 .verify(Duration.ofSeconds(5));
162     }
163
164     @Test
165     void testCbsClientWithStreamsParsingUsingSwitch() {
166         // given
167         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
168         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
169         // TODO: Use these parsers below
170         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
171         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
172
173         // when
174         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
175                 .map(json -> {
176                     final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
177                             .groupBy(RawDataStream::type);
178
179                     final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
180                             .map(kafkaSinkParser::unsafeParse);
181                     final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
182                             .map(mrSinkParser::unsafeParse);
183
184                     assertThat(allKafkaSinks.size())
185                             .describedAs("Number of kafka sinks")
186                             .isEqualTo(2);
187                     assertThat(allMrSinks.size())
188                             .describedAs("Number of DMAAP-MR sinks")
189                             .isEqualTo(1);
190
191                     return true;
192                 })
193                 .then();
194
195         // then
196         StepVerifier.create(result)
197                 .expectComplete()
198                 .verify(Duration.ofSeconds(5));
199     }
200
201     @Test
202     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
203         // given
204         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
205         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
206         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
207
208         // when
209         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
210                 .map(json ->
211                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
212                 );
213
214         // then
215         StepVerifier.create(result)
216                 .expectErrorSatisfies(ex -> {
217                     assertThat(ex).isInstanceOf(IllegalArgumentException.class);
218                     assertThat(ex).hasMessageContaining("Invalid stream type");
219                     assertThat(ex).hasMessageContaining("message_router");
220                     assertThat(ex).hasMessageContaining("kafka");
221                 })
222                 .verify(Duration.ofSeconds(5));
223     }
224
225     private String sampleConfigValue(JsonObject obj) {
226         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
227     }
228
229     private static Mono<String> lazyConsulResponse() {
230         return Mono.just(CONSUL_RESPONSE)
231                 .map(CbsClientImplIT::processConsulResponseTemplate);
232     }
233
234     private static String processConsulResponseTemplate(String resp) {
235         return resp.replaceAll("HOST", server.host())
236                 .replaceAll("PORT", Integer.toString(server.port()));
237     }
238 }