Support other CBS endpoints
[dcaegen2/services/sdk.git] / rest-services / cbs-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / cbs / client / impl / CbsClientImplIT.java
index 58e1e6c..33b0920 100644 (file)
@@ -34,9 +34,11 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
@@ -61,7 +63,9 @@ class CbsClientImplIT {
             + "        \"ServicePort\": PORT\n"
             + "    }\n"
             + "]\n";
-    private static final String SAMPLE_CONFIG = "/sample_config.json";
+    private static final String SAMPLE_CONFIG = "/sample_service_config.json";
+    private static final String SAMPLE_ALL = "/sample_all.json";
+    private static final String SAMPLE_KEY = "/sample_key.json";
     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
     private static EnvProperties sampleEnvironment;
@@ -71,7 +75,10 @@ class CbsClientImplIT {
     static void setUp() {
         server = DummyHttpServer.start(routes ->
                 routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse()))
-                        .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG)));
+                        .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
+                        .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
+                        .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
+        );
         sampleEnvironment = ImmutableEnvProperties.builder()
                 .appName("dcae-component")
                 .cbsName("the_cbs")
@@ -89,10 +96,10 @@ class CbsClientImplIT {
     void testCbsClientWithSingleCall() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
-        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext));
+        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
         // then
         StepVerifier.create(result.map(this::sampleConfigValue))
@@ -105,11 +112,11 @@ class CbsClientImplIT {
     void testCbsClientWithPeriodicCall() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
         final Flux<JsonObject> result = sut
-                .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
+                .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
 
         // then
         final int itemsToTake = 5;
@@ -123,12 +130,12 @@ class CbsClientImplIT {
     void testCbsClientWithUpdatesCall() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
         final Duration period = Duration.ofMillis(10);
 
         // when
         final Flux<JsonObject> result = sut
-                .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period));
+                .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
 
         // then
         final Duration timeToCollectItemsFor = period.multipliedBy(50);
@@ -143,10 +150,10 @@ class CbsClientImplIT {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
-        final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+        final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
                 .map(json ->
                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
                 );
@@ -166,13 +173,13 @@ class CbsClientImplIT {
     void testCbsClientWithStreamsParsingUsingSwitch() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
         // TODO: Use these parsers below
         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
 
         // when
-        final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+        final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
                 .map(json -> {
                     final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
                             .groupBy(RawDataStream::type);
@@ -204,10 +211,10 @@ class CbsClientImplIT {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
-        final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+        final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
                 .map(json ->
                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
                 );
@@ -223,6 +230,46 @@ class CbsClientImplIT {
                 .verify(Duration.ofSeconds(5));
     }
 
+    @Test
+    void testCbsClientWithSingleAllRequest() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
+
+        // when
+        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
+
+        // then
+        StepVerifier.create(result)
+                .assertNext(json -> {
+                    assertThat(json.get("config")).isNotNull();
+                    assertThat(json.get("policies")).isNotNull();
+                    assertThat(json.get("sampleKey")).isNotNull();
+                })
+                .expectComplete()
+                .verify(Duration.ofSeconds(5));
+    }
+
+
+    @Test
+    void testCbsClientWithSingleKeyRequest() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
+
+        // when
+        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
+
+        // then
+        StepVerifier.create(result)
+                .assertNext(json -> {
+                    assertThat(json.get("key")).isNotNull();
+                    assertThat(json.get("key").getAsString()).isEqualTo("value");
+                })
+                .expectComplete()
+                .verify(Duration.ofSeconds(5));
+    }
+
     private String sampleConfigValue(JsonObject obj) {
         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
     }