58e1e6cbd9574a0e21b993d4f67ffdb521c95409
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
25 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
26
27 import com.google.gson.JsonObject;
28 import io.vavr.collection.Map;
29 import io.vavr.collection.Stream;
30 import java.time.Duration;
31 import org.junit.jupiter.api.AfterAll;
32 import org.junit.jupiter.api.BeforeAll;
33 import org.junit.jupiter.api.Test;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
46 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
49 import reactor.test.StepVerifier;
50
51 /**
52  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
53  * @since February 2019
54  */
55 class CbsClientImplIT {
56
57     private static final String CONSUL_RESPONSE = "[\n"
58             + "    {\n"
59             + "        \"ServiceAddress\": \"HOST\",\n"
60             + "        \"ServiceName\": \"the_cbs\",\n"
61             + "        \"ServicePort\": PORT\n"
62             + "    }\n"
63             + "]\n";
64     private static final String SAMPLE_CONFIG = "/sample_config.json";
65     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
66     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
67     private static EnvProperties sampleEnvironment;
68     private static DummyHttpServer server;
69
70     @BeforeAll
71     static void setUp() {
72         server = DummyHttpServer.start(routes ->
73                 routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse()))
74                         .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG)));
75         sampleEnvironment = ImmutableEnvProperties.builder()
76                 .appName("dcae-component")
77                 .cbsName("the_cbs")
78                 .consulHost(server.host())
79                 .consulPort(server.port())
80                 .build();
81     }
82
83     @AfterAll
84     static void tearDown() {
85         server.close();
86     }
87
88     @Test
89     void testCbsClientWithSingleCall() {
90         // given
91         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
92         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
93
94         // when
95         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext));
96
97         // then
98         StepVerifier.create(result.map(this::sampleConfigValue))
99                 .expectNext(EXPECTED_CONFIG_VALUE)
100                 .expectComplete()
101                 .verify(Duration.ofSeconds(5));
102     }
103
104     @Test
105     void testCbsClientWithPeriodicCall() {
106         // given
107         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
108         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
109
110         // when
111         final Flux<JsonObject> result = sut
112                 .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
113
114         // then
115         final int itemsToTake = 5;
116         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
117                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE).cycle(itemsToTake))
118                 .expectComplete()
119                 .verify(Duration.ofSeconds(5));
120     }
121
122     @Test
123     void testCbsClientWithUpdatesCall() {
124         // given
125         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
126         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
127         final Duration period = Duration.ofMillis(10);
128
129         // when
130         final Flux<JsonObject> result = sut
131                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period));
132
133         // then
134         final Duration timeToCollectItemsFor = period.multipliedBy(50);
135         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
136                 .expectNext(EXPECTED_CONFIG_VALUE)
137                 .expectComplete()
138                 .verify(Duration.ofSeconds(5));
139     }
140
141     @Test
142     void testCbsClientWithStreamsParsing() {
143         // given
144         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
145         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
146         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
147
148         // when
149         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
150                 .map(json ->
151                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
152                 );
153
154         // then
155         StepVerifier.create(result)
156                 .consumeNextWith(kafkaSink -> {
157                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
158                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
159                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
160                 })
161                 .expectComplete()
162                 .verify(Duration.ofSeconds(5));
163     }
164
165     @Test
166     void testCbsClientWithStreamsParsingUsingSwitch() {
167         // given
168         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
169         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
170         // TODO: Use these parsers below
171         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
172         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
173
174         // when
175         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
176                 .map(json -> {
177                     final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
178                             .groupBy(RawDataStream::type);
179
180                     final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
181                             .map(kafkaSinkParser::unsafeParse);
182                     final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
183                             .map(mrSinkParser::unsafeParse);
184
185                     assertThat(allKafkaSinks.size())
186                             .describedAs("Number of kafka sinks")
187                             .isEqualTo(2);
188                     assertThat(allMrSinks.size())
189                             .describedAs("Number of DMAAP-MR sinks")
190                             .isEqualTo(1);
191
192                     return true;
193                 })
194                 .then();
195
196         // then
197         StepVerifier.create(result)
198                 .expectComplete()
199                 .verify(Duration.ofSeconds(5));
200     }
201
202     @Test
203     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
204         // given
205         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
206         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
207         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
208
209         // when
210         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
211                 .map(json ->
212                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
213                 );
214
215         // then
216         StepVerifier.create(result)
217                 .expectErrorSatisfies(ex -> {
218                     assertThat(ex).isInstanceOf(IllegalArgumentException.class);
219                     assertThat(ex).hasMessageContaining("Invalid stream type");
220                     assertThat(ex).hasMessageContaining("message_router");
221                     assertThat(ex).hasMessageContaining("kafka");
222                 })
223                 .verify(Duration.ofSeconds(5));
224     }
225
226     private String sampleConfigValue(JsonObject obj) {
227         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
228     }
229
230     private static Mono<String> lazyConsulResponse() {
231         return Mono.just(CONSUL_RESPONSE)
232                 .map(CbsClientImplIT::processConsulResponseTemplate);
233     }
234
235     private static String processConsulResponseTemplate(String resp) {
236         return resp.replaceAll("HOST", server.host())
237                 .replaceAll("PORT", Integer.toString(server.port()));
238     }
239 }