2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
 
  25 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
 
  27 import com.google.gson.JsonObject;
 
  28 import io.vavr.collection.Map;
 
  29 import io.vavr.collection.Stream;
 
  30 import java.time.Duration;
 
  31 import org.junit.jupiter.api.AfterAll;
 
  32 import org.junit.jupiter.api.BeforeAll;
 
  33 import org.junit.jupiter.api.Test;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 
  40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
 
  44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 
  45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
 
  46 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
 
  47 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
 
  48 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  49 import reactor.core.publisher.Flux;
 
  50 import reactor.core.publisher.Mono;
 
  51 import reactor.test.StepVerifier;
 
  54  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
 
  55  * @since February 2019
 
  57 class CbsClientImplIT {
 
  59     private static final String CONSUL_RESPONSE = "[\n"
 
  61             + "        \"ServiceAddress\": \"HOST\",\n"
 
  62             + "        \"ServiceName\": \"the_cbs\",\n"
 
  63             + "        \"ServicePort\": PORT\n"
 
  66     private static final String SAMPLE_CONFIG = "/sample_service_config.json";
 
  67     private static final String SAMPLE_ALL = "/sample_all.json";
 
  68     private static final String SAMPLE_KEY = "/sample_key.json";
 
  69     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
 
  70     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
 
  71     private static EnvProperties sampleEnvironment;
 
  72     private static DummyHttpServer server;
 
  76         server = DummyHttpServer.start(routes ->
 
  77                 routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse()))
 
  78                         .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
 
  79                         .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
 
  80                         .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
 
  82         sampleEnvironment = ImmutableEnvProperties.builder()
 
  83                 .appName("dcae-component")
 
  85                 .consulHost(server.host())
 
  86                 .consulPort(server.port())
 
  91     static void tearDown() {
 
  96     void testCbsClientWithSingleCall() {
 
  98         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
  99         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 102         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 105         StepVerifier.create(result.map(this::sampleConfigValue))
 
 106                 .expectNext(EXPECTED_CONFIG_VALUE)
 
 108                 .verify(Duration.ofSeconds(5));
 
 112     void testCbsClientWithPeriodicCall() {
 
 114         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
 115         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 118         final Flux<JsonObject> result = sut
 
 119                 .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
 
 122         final int itemsToTake = 5;
 
 123         StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue))
 
 124                 .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE).cycle(itemsToTake))
 
 126                 .verify(Duration.ofSeconds(5));
 
 130     void testCbsClientWithUpdatesCall() {
 
 132         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
 133         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 134         final Duration period = Duration.ofMillis(10);
 
 137         final Flux<JsonObject> result = sut
 
 138                 .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
 
 141         final Duration timeToCollectItemsFor = period.multipliedBy(50);
 
 142         StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
 
 143                 .expectNext(EXPECTED_CONFIG_VALUE)
 
 145                 .verify(Duration.ofSeconds(5));
 
 149     void testCbsClientWithStreamsParsing() {
 
 151         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
 152         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
 
 153         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 156         final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 158                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
 
 162         StepVerifier.create(result)
 
 163                 .consumeNextWith(kafkaSink -> {
 
 164                     assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
 
 165                     assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
 
 166                     assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
 
 169                 .verify(Duration.ofSeconds(5));
 
 173     void testCbsClientWithStreamsParsingUsingSwitch() {
 
 175         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
 176         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 177         // TODO: Use these parsers below
 
 178         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
 
 179         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
 
 182         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 184                     final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
 
 185                             .groupBy(RawDataStream::type);
 
 187                     final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
 
 188                             .map(kafkaSinkParser::unsafeParse);
 
 189                     final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
 
 190                             .map(mrSinkParser::unsafeParse);
 
 192                     assertThat(allKafkaSinks.size())
 
 193                             .describedAs("Number of kafka sinks")
 
 195                     assertThat(allMrSinks.size())
 
 196                             .describedAs("Number of DMAAP-MR sinks")
 
 204         StepVerifier.create(result)
 
 206                 .verify(Duration.ofSeconds(5));
 
 210     void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
 
 212         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
 213         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
 
 214         final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
 217         final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
 
 219                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
 
 223         StepVerifier.create(result)
 
 224                 .expectErrorSatisfies(ex -> {
 
 225                     assertThat(ex).isInstanceOf(IllegalArgumentException.class);
 
 226                     assertThat(ex).hasMessageContaining("Invalid stream type");
 
 227                     assertThat(ex).hasMessageContaining("message_router");
 
 228                     assertThat(ex).hasMessageContaining("kafka");
 
 230                 .verify(Duration.ofSeconds(5));
 
 234     void testCbsClientWithSingleAllRequest() {
 
 236         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
 237         final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
 
 240         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 243         StepVerifier.create(result)
 
 244                 .assertNext(json -> {
 
 245                     assertThat(json.get("config")).isNotNull();
 
 246                     assertThat(json.get("policies")).isNotNull();
 
 247                     assertThat(json.get("sampleKey")).isNotNull();
 
 250                 .verify(Duration.ofSeconds(5));
 
 255     void testCbsClientWithSingleKeyRequest() {
 
 257         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
 
 258         final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
 
 261         final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
 264         StepVerifier.create(result)
 
 265                 .assertNext(json -> {
 
 266                     assertThat(json.get("key")).isNotNull();
 
 267                     assertThat(json.get("key").getAsString()).isEqualTo("value");
 
 270                 .verify(Duration.ofSeconds(5));
 
 273     private String sampleConfigValue(JsonObject obj) {
 
 274         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
 
 277     private static Mono<String> lazyConsulResponse() {
 
 278         return Mono.just(CONSUL_RESPONSE)
 
 279                 .map(CbsClientImplIT::processConsulResponseTemplate);
 
 282     private static String processConsulResponseTemplate(String resp) {
 
 283         return resp.replaceAll("HOST", server.host())
 
 284                 .replaceAll("PORT", Integer.toString(server.port()));