2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
 
  25 import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
 
  26 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
 
  27 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
 
  29 import com.google.gson.JsonObject;
 
  30 import io.vavr.collection.Stream;
 
  32 import java.time.Duration;
 
  34 import org.junit.jupiter.api.AfterAll;
 
  35 import org.junit.jupiter.api.BeforeAll;
 
  36 import org.junit.jupiter.api.Test;
 
  37 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
 
  38 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
 
  39 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
 
  40 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 
  44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 
  46 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 
  47 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
 
  48 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 
  49 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 
  50 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 
  51 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
 
  52 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration;
 
  53 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  54 import reactor.core.publisher.Flux;
 
  55 import reactor.core.publisher.Mono;
 
  56 import reactor.test.StepVerifier;
 
  59  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
 
  60  * @since February 2019
 
  62 class CbsClientImplIT {
 
  64     private static final String SAMPLE_CONFIG = "/sample_service_config.json";
 
  65     private static final String SAMPLE_ALL = "/sample_all.json";
 
  66     private static final String SAMPLE_KEY = "/sample_key.json";
 
  67     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
 
  68     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
 
  69     private static CbsClientConfiguration sampleConfiguration;
 
  70     private static DummyHttpServer server;
 
  74         server = DummyHttpServer.start(routes ->
 
  75                 routes.get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
 
  76                         .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
 
  77                         .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
 
  79         sampleConfiguration = ImmutableCbsClientConfiguration.builder()
 
  80                 .appName("dcae-component")
 
  81                 .hostname(server.host())
 
  87     static void tearDown() {
 
  92     void testCbsClientWithSingleCall() {
 
  94         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
  95         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
  98         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 101         StepVerifier.create(result.map(this::sampleConfigValue))
 
 102                 .expectNext(EXPECTED_CONFIG_VALUE)
 
 104                 .verify(Duration.ofSeconds(5));
 
 108     void testCbsClientWithPeriodicCall() {
 
 110         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
 111         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 114         final Flux<JsonObject> result = sut
 
 115                 .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
 
 118         final int itemsToTake = 5;
 
 119         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
 
 120                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE).cycle(itemsToTake))
 
 122                 .verify(Duration.ofSeconds(5));
 
 126     void testCbsClientWithUpdatesCall() {
 
 128         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
 129         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 130         final Duration period = Duration.ofMillis(10);
 
 133         final Flux<JsonObject> result = sut
 
 134                 .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
 
 137         final Duration timeToCollectItemsFor = period.multipliedBy(50);
 
 138         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
 
 139                 .expectNext(EXPECTED_CONFIG_VALUE)
 
 141                 .verify(Duration.ofSeconds(5));
 
 145     void testCbsClientWithStreamsParsing() {
 
 147         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
 148         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
 
 149         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 152         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 154                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
 
 158         StepVerifier.create(result)
 
 159                 .consumeNextWith(kafkaSink -> {
 
 160                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
 
 161                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
 
 162                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
 
 165                 .verify(Duration.ofSeconds(5));
 
 169     void testCbsClientWithStreamsParsingUsingSwitch() {
 
 171         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
 172         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 173         // TODO: Use these parsers below
 
 174         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
 
 175         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
 
 178         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 180                     final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
 
 182                     final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
 
 183                             .map(kafkaSinkParser::unsafeParse);
 
 184                     final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
 
 185                             .map(mrSinkParser::unsafeParse);
 
 187                     assertThat(allKafkaSinks.size())
 
 188                             .describedAs("Number of kafka sinks")
 
 190                     assertThat(allMrSinks.size())
 
 191                             .describedAs("Number of DMAAP-MR sinks")
 
 199         StepVerifier.create(result)
 
 201                 .verify(Duration.ofSeconds(5));
 
 205     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
 
 207         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
 208         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
 
 209         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 212         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 214                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
 
 218         StepVerifier.create(result)
 
 219                 .expectErrorSatisfies(ex -> {
 
 220                     assertThat(ex).isInstanceOf(StreamParsingException.class);
 
 221                     assertThat(ex).hasMessageContaining("Invalid stream type");
 
 222                     assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
 
 223                     assertThat(ex).hasMessageContaining(KAFKA.toString());
 
 225                 .verify(Duration.ofSeconds(5));
 
 229     void testCbsClientWithSingleAllRequest() {
 
 231         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
 232         final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
 
 235         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 238         StepVerifier.create(result)
 
 239                 .assertNext(json -> {
 
 240                     assertThat(json.get("config")).isNotNull();
 
 241                     assertThat(json.get("policies")).isNotNull();
 
 242                     assertThat(json.get("sampleKey")).isNotNull();
 
 245                 .verify(Duration.ofSeconds(5));
 
 250     void testCbsClientWithSingleKeyRequest() {
 
 252         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfiguration);
 
 253         final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
 
 256         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 259         StepVerifier.create(result)
 
 260                 .assertNext(json -> {
 
 261                     assertThat(json.get("key")).isNotNull();
 
 262                     assertThat(json.get("key").getAsString()).isEqualTo("value");
 
 265                 .verify(Duration.ofSeconds(5));
 
 269     void testCbsClientWhenTheConfigurationWasNotFound() {
 
 271         final CbsClientConfiguration unknownAppEnv = ImmutableCbsClientConfiguration.copyOf(sampleConfiguration).withAppName("unknown_app");
 
 272         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(unknownAppEnv);
 
 273         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 276         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 279         StepVerifier.create(result)
 
 280                 .expectError(HttpException.class)
 
 281                 .verify(Duration.ofSeconds(5));
 
 284     private String sampleConfigValue(JsonObject obj) {
 
 285         return obj.get(SAMPLE_CONFIG_KEY).getAsString();