2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
  23 import com.google.gson.JsonObject;
 
  24 import io.vavr.collection.Stream;
 
  25 import org.jetbrains.annotations.NotNull;
 
  26 import org.junit.jupiter.api.AfterAll;
 
  27 import org.junit.jupiter.api.BeforeAll;
 
  28 import org.junit.jupiter.api.Test;
 
  29 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
 
  30 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
 
  31 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
 
  32 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 
  33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
 
  40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 
  44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration;
 
  45 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  46 import reactor.core.publisher.Flux;
 
  47 import reactor.core.publisher.Mono;
 
  48 import reactor.test.StepVerifier;
 
  50 import java.time.Duration;
 
  52 import static org.assertj.core.api.Assertions.assertThat;
 
  53 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
 
  54 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
 
  55 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
 
  56 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
 
  59  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
 
  60  * @since February 2019
 
  62 class CbsClientImplIT {
 
  64     private static final String SAMPLE_CONFIG = "/sample_service_config.json";
 
  65     private static final String SAMPLE_ALL = "/sample_all.json";
 
  66     private static final String SAMPLE_KEY = "/sample_key.json";
 
  67     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
 
  68     private static final String EXPECTED_CONFIG_VALUE_FROM_CBS = "/var/run/security/keystore.p12";
 
  69     private static final String EXPECTED_CONFIG_VALUE_FROM_FILE = "/var/run/security/keystore_file.p12";
 
  70     private static final String CONFIG_MAP_FILE_PATH = "src/test/resources/sample_local_service_config.json";
 
  71     private static CbsClientConfiguration sampleConfigurationCbsSource;
 
  72     private static CbsClientConfiguration sampleConfigurationFileSource;
 
  73     private static DummyHttpServer server;
 
  77         server = DummyHttpServer.start(routes ->
 
  78                 routes.get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
 
  79                         .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
 
  80                         .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
 
  82         ImmutableCbsClientConfiguration.Builder configBuilder = getConfigBuilder();
 
  83         sampleConfigurationCbsSource = configBuilder.build();
 
  84         sampleConfigurationFileSource = configBuilder.configMapFilePath(CONFIG_MAP_FILE_PATH).build();
 
  88     static void tearDown() {
 
  93     void testCbsClientWithSingleCall() {
 
  95         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
  96         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
  99         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 102         StepVerifier.create(result.map(this::sampleConfigValue))
 
 103                 .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS)
 
 105                 .verify(Duration.ofSeconds(5));
 
 109     void testCbsClientWithPeriodicCall() {
 
 111         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
 112         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 115         final Flux<JsonObject> result = sut
 
 116                 .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
 
 119         final int itemsToTake = 5;
 
 120         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
 
 121                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE_FROM_CBS).cycle(itemsToTake))
 
 123                 .verify(Duration.ofSeconds(5));
 
 127     void testCbsClientWithUpdatesCall() {
 
 129         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
 130         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 131         final Duration period = Duration.ofMillis(10);
 
 134         final Flux<JsonObject> result = sut
 
 135                 .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
 
 138         final Duration timeToCollectItemsFor = period.multipliedBy(50);
 
 139         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
 
 140                 .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS)
 
 142                 .verify(Duration.ofSeconds(5));
 
 146     void testCbsClientWithConfigRetrievedFromFile() {
 
 148         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationFileSource);
 
 149         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 152         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 155         StepVerifier.create(result.map(this::sampleConfigValue))
 
 156                 .expectNext(EXPECTED_CONFIG_VALUE_FROM_FILE)
 
 158                 .verify(Duration.ofSeconds(5));
 
 162     void testCbsClientWithStreamsParsing() {
 
 164         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
 165         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
 
 166         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 169         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 171                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
 
 175         StepVerifier.create(result)
 
 176                 .consumeNextWith(kafkaSink -> {
 
 177                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
 
 178                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
 
 179                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
 
 182                 .verify(Duration.ofSeconds(5));
 
 186     void testCbsClientWithStreamsParsingUsingSwitch() {
 
 188         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
 189         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 190         // TODO: Use these parsers below
 
 191         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
 
 192         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
 
 195         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 197                     final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
 
 199                     final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
 
 200                             .map(kafkaSinkParser::unsafeParse);
 
 201                     final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
 
 202                             .map(mrSinkParser::unsafeParse);
 
 204                     assertThat(allKafkaSinks.size())
 
 205                             .describedAs("Number of kafka sinks")
 
 207                     assertThat(allMrSinks.size())
 
 208                             .describedAs("Number of DMAAP-MR sinks")
 
 216         StepVerifier.create(result)
 
 218                 .verify(Duration.ofSeconds(5));
 
 222     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
 
 224         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
 225         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
 
 226         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 229         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 231                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
 
 235         StepVerifier.create(result)
 
 236                 .expectErrorSatisfies(ex -> {
 
 237                     assertThat(ex).isInstanceOf(StreamParsingException.class);
 
 238                     assertThat(ex).hasMessageContaining("Invalid stream type");
 
 239                     assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
 
 240                     assertThat(ex).hasMessageContaining(KAFKA.toString());
 
 242                 .verify(Duration.ofSeconds(5));
 
 246     void testCbsClientWithSingleAllRequest() {
 
 248         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
 249         final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
 
 252         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 255         StepVerifier.create(result)
 
 256                 .assertNext(json -> {
 
 257                     assertThat(json.get("config")).isNotNull();
 
 258                     assertThat(json.get("policies")).isNotNull();
 
 259                     assertThat(json.get("sampleKey")).isNotNull();
 
 262                 .verify(Duration.ofSeconds(5));
 
 267     void testCbsClientWithSingleKeyRequest() {
 
 269         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource);
 
 270         final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
 
 273         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 276         StepVerifier.create(result)
 
 277                 .assertNext(json -> {
 
 278                     assertThat(json.get("key")).isNotNull();
 
 279                     assertThat(json.get("key").getAsString()).isEqualTo("value");
 
 282                 .verify(Duration.ofSeconds(5));
 
 286     void testCbsClientWhenTheConfigurationWasNotFound() {
 
 288         final CbsClientConfiguration unknownAppEnv = ImmutableCbsClientConfiguration.copyOf(sampleConfigurationCbsSource).withAppName("unknown_app");
 
 289         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(unknownAppEnv);
 
 290         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 293         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 296         StepVerifier.create(result)
 
 297                 .expectError(HttpException.class)
 
 298                 .verify(Duration.ofSeconds(5));
 
 302     private static ImmutableCbsClientConfiguration.Builder getConfigBuilder() {
 
 303         return ImmutableCbsClientConfiguration.builder()
 
 305                 .appName("dcae-component")
 
 306                 .hostname(server.host())
 
 307                 .port(server.port());
 
 310     private String sampleConfigValue(JsonObject obj) {
 
 311         return obj.get(SAMPLE_CONFIG_KEY).getAsString();