Support other CBS endpoints 36/82936/8
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 21 Mar 2019 13:45:50 +0000 (14:45 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 22 Mar 2019 09:10:54 +0000 (10:10 +0100)
Change-Id: I227a8edf6da8398ca58c47e864985dac47c5dfcd
Issue-ID: DCAEGEN2-1363
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
13 files changed:
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
rest-services/cbs-client/src/test/resources/sample_all.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/sample_key.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/sample_service_config.json [moved from rest-services/cbs-client/src/test/resources/sample_config.json with 100% similarity]

index 3ee12ee..d6a5700 100644 (file)
@@ -21,12 +21,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
 
 import com.google.gson.JsonObject;
 import java.time.Duration;
-import java.util.UUID;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
 import org.jetbrains.annotations.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -46,28 +44,27 @@ public interface CbsClient {
      * <p>
      * Returns a {@link Mono} that publishes new configuration after CBS client retrieves one.
      *
+     * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests})
      * @return reactive stream of configuration
-     * @param diagnosticContext diagnostic context as defined in Logging Guideline
-     * @since 1.1.2
      */
-    @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext);
+    @NotNull Mono<JsonObject> get(CbsRequest request);
 
     /**
      * <p>
      * Poll for configuration.
      *
      * <p>
-     * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be
+     * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be
      * changed, ie. items in the stream might be the same until change is made in CBS.
      *
-     * @param diagnosticContext diagnostic context as defined in Logging Guideline
+     * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests})
      * @param initialDelay delay after first request attempt
      * @param period frequency of update checks
      * @return stream of configuration states
      */
-    default Flux<JsonObject> get(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) {
+    default Flux<JsonObject> get(CbsRequest request, Duration initialDelay, Duration period) {
         return Flux.interval(initialDelay, period)
-                .map(i -> ImmutableRequestDiagnosticContext.copyOf(diagnosticContext).withInvocationId(UUID.randomUUID()))
+                .map(i -> request.withNewInvocationId())
                 .flatMap(this::get);
     }
 
@@ -76,7 +73,7 @@ public interface CbsClient {
      * Poll for configuration updates.
      *
      * <p>
-     * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Will emit an item
+     * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Will emit an item
      * only when an update was detected, ie. when new item is different then last emitted item.
      *
      * <p>
@@ -87,17 +84,17 @@ public interface CbsClient {
      *         (<b>experimental API</b>) if you want to react differently to changes in subsets of the configuration.
      *     </li>
      *     <li>
-     *         Use {@link #get(RequestDiagnosticContext, Duration, Duration)} with
+     *         Use {@link #get(CbsRequest, Duration, Duration)} with
      *         {@link Flux#distinctUntilChanged(Function, BiPredicate)} if you want to specify custom comparison logic.
      *     </li>
      * </ul>
      *
-     * @param diagnosticContext diagnostic context as defined in Logging Guideline
+     * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests})
      * @param initialDelay delay after first request attempt
      * @param period frequency of update checks
      * @return stream of configuration updates
      */
-    default Flux<JsonObject> updates(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) {
-        return get(diagnosticContext, initialDelay, period).distinctUntilChanged();
+    default Flux<JsonObject> updates(CbsRequest request, Duration initialDelay, Duration period) {
+        return get(request, initialDelay, period).distinctUntilChanged();
     }
 }
index 379daf9..c11ed53 100644 (file)
@@ -56,7 +56,7 @@ public class CbsClientFactory {
             final RxHttpClient httpClient = RxHttpClient.create();
             final CbsLookup lookup = new CbsLookup(httpClient);
             return lookup.lookup(env)
-                    .map(addr -> CbsClientImpl.create(httpClient, addr, env.appName()));
+                    .map(addr -> new CbsClientImpl(httpClient, env.appName(), addr));
         });
     }
 }
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java
new file mode 100644 (file)
index 0000000..3724338
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+
+/**
+ * A factory to various of requests supported by Config Binding Service.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public final class CbsRequests {
+
+    /**
+     * <p>A get-configuration request.</p>
+     *
+     * <p>Will bind the configuration for given service and return the bound configuration.</p>
+     *
+     * @param diagnosticContext logging diagnostic context (MDC)
+     * @return the CbsRequest ready to be used when calling {@link CbsClient}
+     */
+    public static @NotNull CbsRequest getConfiguration(RequestDiagnosticContext diagnosticContext) {
+        return ImmutableCbsRequest.builder()
+                .diagnosticContext(diagnosticContext)
+                .requestPath(serviceName -> "/service_component/" + serviceName)
+                .build();
+    }
+
+    /**
+     * <p>A get-by-key request.</p>
+     *
+     * <p>This will call an endpoint that fetches a generic service_component_name:key out of Consul</p>
+     *
+     * @param diagnosticContext logging diagnostic context (MDC)
+     * @return the CbsRequest ready to be used when calling {@link CbsClient}
+     */
+    public static @NotNull CbsRequest getByKey(
+            RequestDiagnosticContext diagnosticContext,
+            String key) {
+        return ImmutableCbsRequest.builder()
+                .diagnosticContext(diagnosticContext)
+                .requestPath(serviceName -> "/" + key + "/" + serviceName)
+                .build();
+    }
+
+    /**
+     * <p>A get-all request.</p>
+     *
+     * <p>Will bind the configuration for given service and return the bound configuration, policies, and any other
+     * keys that are in Consul</p>
+     *
+     * @param diagnosticContext logging diagnostic context (MDC)
+     * @return the CbsRequest ready to be used when calling {@link CbsClient}
+     */
+    public static @NotNull CbsRequest getAll(RequestDiagnosticContext diagnosticContext) {
+        return ImmutableCbsRequest.builder()
+                .diagnosticContext(diagnosticContext)
+                .requestPath(serviceName -> "/service_component_all/" + serviceName)
+                .build();
+    }
+
+}
index 72c1b26..98f3cc9 100644 (file)
@@ -28,45 +28,57 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
 public class CbsClientImpl implements CbsClient {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientImpl.class);
     private final RxHttpClient httpClient;
-    private final String fetchUrl;
+    private final String serviceName;
+    private final InetSocketAddress cbsAddress;
 
-    CbsClientImpl(RxHttpClient httpClient, URL fetchUrl) {
+    public CbsClientImpl(RxHttpClient httpClient, String serviceName, InetSocketAddress cbsAddress) {
         this.httpClient = httpClient;
-        this.fetchUrl = fetchUrl.toString();
+        this.serviceName = serviceName;
+        this.cbsAddress = cbsAddress;
     }
 
-    public static CbsClientImpl create(RxHttpClient httpClient, InetSocketAddress cbsAddress, String serviceName) {
-        return new CbsClientImpl(httpClient, constructUrl(cbsAddress, serviceName));
+    @Override
+    public @NotNull Mono<JsonObject> get(CbsRequest request) {
+        return Mono.fromCallable(() -> constructUrl(request).toString())
+                .doOnNext(this::logRequestUrl)
+                .map(url -> ImmutableHttpRequest.builder()
+                        .method(HttpMethod.GET)
+                        .url(url)
+                        .diagnosticContext(request.diagnosticContext())
+                        .build())
+                .flatMap(httpClient::call)
+                .map(resp -> resp.bodyAsJson(JsonObject.class))
+                .doOnNext(this::logCbsResponse);
     }
 
-    private static URL constructUrl(InetSocketAddress cbsAddress, String serviceName) {
+
+    private URL constructUrl(CbsRequest request) {
         try {
             return new URL(
                     "http",
                     cbsAddress.getHostString(),
                     cbsAddress.getPort(),
-                    "/service_component/" + serviceName);
+                    request.requestPath().getForService(serviceName));
         } catch (MalformedURLException e) {
             throw new IllegalArgumentException("Invalid CBS URL", e);
         }
     }
 
-    @Override
-    public @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext) {
-        return Mono.defer(() -> {
-            final ImmutableHttpRequest request = ImmutableHttpRequest.builder()
-                    .method(HttpMethod.GET)
-                    .url(fetchUrl)
-                    .diagnosticContext(diagnosticContext)
-                    .build();
-            return httpClient.call(request)
-                    .map(resp -> resp.bodyAsJson(JsonObject.class));
-        });
+    private void logRequestUrl(String url) {
+        LOGGER.debug("Calling {} for configuration", url);
+    }
+
+    private void logCbsResponse(JsonObject json) {
+        LOGGER.info("Got successful response from Config Binding Service");
+        LOGGER.debug("CBS response: {}", json);
     }
 }
index 3d528c3..9905877 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
 import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import java.net.InetSocketAddress;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -29,6 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpR
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
 /**
@@ -37,6 +40,7 @@ import reactor.core.publisher.Mono;
  */
 public class CbsLookup {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(CbsLookup.class);
     private static final String CONSUL_JSON_SERVICE_ADDRESS = "ServiceAddress";
     private static final String CONSUL_JSON_SERVICE_PORT = "ServicePort";
     private final RxHttpClient httpClient;
@@ -47,15 +51,22 @@ public class CbsLookup {
 
     public Mono<InetSocketAddress> lookup(EnvProperties env) {
         return Mono.fromCallable(() -> createConsulUrl(env))
+                .doOnNext(this::logConsulRequestUrl)
                 .flatMap(this::fetchHttpData)
+                .doOnNext(this::logConsulResponse)
                 .flatMap(this::firstService)
-                .map(this::parseServiceEntry);
+                .map(this::parseServiceEntry)
+                .doOnNext(this::logCbsServiceAddress);
     }
 
     private String createConsulUrl(EnvProperties env) {
         return String.format("http://%s:%s/v1/catalog/service/%s", env.consulHost(), env.consulPort(), env.cbsName());
     }
 
+    private void logConsulRequestUrl(String consulUrl) {
+        LOGGER.debug("Calling Consul for CBS address. consulUrl={}", consulUrl);
+    }
+
     private Mono<JsonArray> fetchHttpData(String consulUrl) {
         return httpClient.call(
                 ImmutableHttpRequest.builder()
@@ -66,6 +77,10 @@ public class CbsLookup {
                 .map(resp -> resp.bodyAsJson(JsonArray.class));
     }
 
+    private void logConsulResponse(JsonArray consulResponse) {
+        LOGGER.debug("Consul response with CBS service list. Will use 1st one. response={}", consulResponse);
+    }
+
     private Mono<JsonObject> firstService(JsonArray services) {
         return services.size() == 0
                 ? Mono.error(new ServiceLookupException("Consul server did not return any service with given name"))
@@ -78,4 +93,8 @@ public class CbsLookup {
                 service.get(CONSUL_JSON_SERVICE_PORT).getAsInt());
     }
 
+    private void logCbsServiceAddress(InetSocketAddress address) {
+        LOGGER.info("Config Binding Service address: {}", address);
+    }
+
 }
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java
new file mode 100644 (file)
index 0000000..0a31966
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model;
+
+import java.util.UUID;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+
+/**
+ * A recipe on which CBS endpoint to call. Usually you should use {@link org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests}
+ * which is a factory to each request type.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@Value.Immutable
+public interface CbsRequest {
+
+    /**
+     * The CBS request path. It will be created by the library.
+     */
+    RequestPath requestPath();
+
+    /**
+     * Diagnostic context as defined in Logging Guideline
+     */
+    RequestDiagnosticContext diagnosticContext();
+
+    /**
+     * Return a view on this CbsRequest with updated InvocationID.
+     */
+    default CbsRequest withNewInvocationId() {
+        final RequestDiagnosticContext newDiagnosticCtx = ImmutableRequestDiagnosticContext
+                .copyOf(diagnosticContext())
+                .withInvocationId(UUID.randomUUID());
+        return ImmutableCbsRequest.copyOf(this).withDiagnosticContext(newDiagnosticCtx);
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java
new file mode 100644 (file)
index 0000000..97d4b4e
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model;
+
+import io.vavr.Function1;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@FunctionalInterface
+public interface RequestPath extends Function1<String, String> {
+    String getForService(String serviceName);
+
+    @Override
+    default String apply(String serviceName) {
+        return getForService(serviceName);
+    }
+}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java
new file mode 100644 (file)
index 0000000..50233d3
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class CbsRequestsTest {
+
+    private final RequestDiagnosticContext diagCtx = RequestDiagnosticContext.create();
+    private final String serviceName = "srv-name";
+
+    @Test
+    void getConfiguration() {
+        // given
+        final CbsRequest cut = CbsRequests.getConfiguration(diagCtx);
+
+        // when
+        final String result = cut.requestPath().getForService(serviceName);
+
+        // then
+        assertThat(result).isEqualTo("/service_component/srv-name");
+    }
+
+    @Test
+    void getByKey() {
+        // given
+        final CbsRequest cut = CbsRequests.getByKey(diagCtx, "configKey");
+
+        // when
+        final String result = cut.requestPath().getForService(serviceName);
+
+        // then
+        assertThat(result).isEqualTo("/configKey/srv-name");
+    }
+
+    @Test
+    void getAll() {
+        // given
+        final CbsRequest cut = CbsRequests.getAll(diagCtx);
+
+        // when
+        final String result = cut.requestPath().getForService(serviceName);
+
+        // then
+        assertThat(result).isEqualTo("/service_component_all/srv-name");
+    }
+}
\ No newline at end of file
index 58e1e6c..33b0920 100644 (file)
@@ -34,9 +34,11 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
@@ -61,7 +63,9 @@ class CbsClientImplIT {
             + "        \"ServicePort\": PORT\n"
             + "    }\n"
             + "]\n";
-    private static final String SAMPLE_CONFIG = "/sample_config.json";
+    private static final String SAMPLE_CONFIG = "/sample_service_config.json";
+    private static final String SAMPLE_ALL = "/sample_all.json";
+    private static final String SAMPLE_KEY = "/sample_key.json";
     private static final String SAMPLE_CONFIG_KEY = "keystore.path";
     private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
     private static EnvProperties sampleEnvironment;
@@ -71,7 +75,10 @@ class CbsClientImplIT {
     static void setUp() {
         server = DummyHttpServer.start(routes ->
                 routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse()))
-                        .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG)));
+                        .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
+                        .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
+                        .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
+        );
         sampleEnvironment = ImmutableEnvProperties.builder()
                 .appName("dcae-component")
                 .cbsName("the_cbs")
@@ -89,10 +96,10 @@ class CbsClientImplIT {
     void testCbsClientWithSingleCall() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
-        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext));
+        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
 
         // then
         StepVerifier.create(result.map(this::sampleConfigValue))
@@ -105,11 +112,11 @@ class CbsClientImplIT {
     void testCbsClientWithPeriodicCall() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
         final Flux<JsonObject> result = sut
-                .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
+                .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
 
         // then
         final int itemsToTake = 5;
@@ -123,12 +130,12 @@ class CbsClientImplIT {
     void testCbsClientWithUpdatesCall() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
         final Duration period = Duration.ofMillis(10);
 
         // when
         final Flux<JsonObject> result = sut
-                .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period));
+                .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
 
         // then
         final Duration timeToCollectItemsFor = period.multipliedBy(50);
@@ -143,10 +150,10 @@ class CbsClientImplIT {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
-        final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+        final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
                 .map(json ->
                         DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
                 );
@@ -166,13 +173,13 @@ class CbsClientImplIT {
     void testCbsClientWithStreamsParsingUsingSwitch() {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
         // TODO: Use these parsers below
         final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
         final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
 
         // when
-        final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+        final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
                 .map(json -> {
                     final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
                             .groupBy(RawDataStream::type);
@@ -204,10 +211,10 @@ class CbsClientImplIT {
         // given
         final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
         final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
-        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
 
         // when
-        final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+        final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
                 .map(json ->
                         DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
                 );
@@ -223,6 +230,46 @@ class CbsClientImplIT {
                 .verify(Duration.ofSeconds(5));
     }
 
+    @Test
+    void testCbsClientWithSingleAllRequest() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
+
+        // when
+        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
+
+        // then
+        StepVerifier.create(result)
+                .assertNext(json -> {
+                    assertThat(json.get("config")).isNotNull();
+                    assertThat(json.get("policies")).isNotNull();
+                    assertThat(json.get("sampleKey")).isNotNull();
+                })
+                .expectComplete()
+                .verify(Duration.ofSeconds(5));
+    }
+
+
+    @Test
+    void testCbsClientWithSingleKeyRequest() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
+
+        // when
+        final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
+
+        // then
+        StepVerifier.create(result)
+                .assertNext(json -> {
+                    assertThat(json.get("key")).isNotNull();
+                    assertThat(json.get("key").getAsString()).isEqualTo("value");
+                })
+                .expectComplete()
+                .verify(Duration.ofSeconds(5));
+    }
+
     private String sampleConfigValue(JsonObject obj) {
         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
     }
index 339b1ef..78b79f9 100644 (file)
@@ -29,13 +29,14 @@ import static org.mockito.Mockito.verify;
 import com.google.gson.JsonObject;
 import java.net.InetSocketAddress;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Mono;
 
@@ -51,7 +52,7 @@ class CbsClientImplTest {
         // given
         InetSocketAddress cbsAddress = InetSocketAddress.createUnresolved("cbshost", 6969);
         String serviceName = "dcaegen2-ves-collector";
-        final CbsClientImpl cut = CbsClientImpl.create(httpClient, cbsAddress, serviceName);
+        final CbsClient cut = new CbsClientImpl(httpClient, serviceName, cbsAddress);
         final HttpResponse httpResponse = ImmutableHttpResponse.builder()
                 .url("http://xxx")
                 .statusCode(200)
@@ -61,7 +62,7 @@ class CbsClientImplTest {
         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
         // when
-        final JsonObject result = cut.get(diagnosticContext).block();
+        final JsonObject result = cut.get(CbsRequests.getConfiguration(diagnosticContext)).block();
 
         // then
         final String expectedUrl = "http://cbshost:6969/service_component/dcaegen2-ves-collector";
diff --git a/rest-services/cbs-client/src/test/resources/sample_all.json b/rest-services/cbs-client/src/test/resources/sample_all.json
new file mode 100644 (file)
index 0000000..ac4ebf2
--- /dev/null
@@ -0,0 +1,41 @@
+{
+    "config": {
+        "keystore.path": "/var/run/security/keystore.p12",
+        "streams_publishes": {
+            "perf3gpp": {
+                "type": "kafka",
+                "kafka_info": {
+                    "bootstrap_servers": "dmaap-mr-kafka:6060",
+                    "topic_name": "HVVES_PERF3GPP"
+                }
+            },
+            "pnf_ready": {
+                "type": "message_router",
+                "dmaap_info": {
+                    "topic_url": "http://message-router:3904/events/VES_PNF_READY"
+                }
+            },
+            "call_trace": {
+                "type": "kafka",
+                "kafka_info": {
+                    "bootstrap_servers": "dmaap-mr-kafka:6060",
+                    "topic_name": "HVVES_TRACE"
+                }
+            }
+        },
+        "streams_subscribes": {
+            "measurements": {
+                "type": "message_router",
+                "dmaap_info": {
+                    "topic_url": "http://message-router:3904/events/VES_MEASUREMENT"
+                }
+            }
+        }
+    },
+    "policies": {
+        "samplePolicy": "sample value"
+    },
+    "sampleKey": {
+        "key": "value"
+    }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/sample_key.json b/rest-services/cbs-client/src/test/resources/sample_key.json
new file mode 100644 (file)
index 0000000..21da3b2
--- /dev/null
@@ -0,0 +1,3 @@
+{
+    "key": "value"
+}