Emit CBS config updates only 45/79345/2
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 28 Feb 2019 11:14:17 +0000 (12:14 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 28 Feb 2019 11:34:32 +0000 (12:34 +0100)
Add an update method to CbsClient which emits an item only when the
configuration has accentually changed.

Change-Id: I6023fb1cc069b06bd2c4baf94406538965b6534c
Issue-ID: DCAEGEN2-1233
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/annotations/ExperimentalApi.java [new file with mode: 0644]

index b9a6e40..3ee12ee 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
 
-import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import java.time.Duration;
 import java.util.UUID;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * <p>Main Config Binding Service client interface.</p>
@@ -39,7 +40,9 @@ import org.jetbrains.annotations.NotNull;
 public interface CbsClient {
 
     /**
-     * Get reactive configuration stream.
+     * <p>
+     * Get current application configuration.
+     *
      * <p>
      * Returns a {@link Mono} that publishes new configuration after CBS client retrieves one.
      *
@@ -50,11 +53,14 @@ public interface CbsClient {
     @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext);
 
     /**
+     * <p>
      * Poll for configuration.
      *
+     * <p>
      * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be
      * changed, ie. items in the stream might be the same until change is made in CBS.
      *
+     * @param diagnosticContext diagnostic context as defined in Logging Guideline
      * @param initialDelay delay after first request attempt
      * @param period frequency of update checks
      * @return stream of configuration states
@@ -64,4 +70,34 @@ public interface CbsClient {
                 .map(i -> ImmutableRequestDiagnosticContext.copyOf(diagnosticContext).withInvocationId(UUID.randomUUID()))
                 .flatMap(this::get);
     }
+
+    /**
+     * <p>
+     * Poll for configuration updates.
+     *
+     * <p>
+     * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Will emit an item
+     * only when an update was detected, ie. when new item is different then last emitted item.
+     *
+     * <p>
+     * For more tailored change detection approach you can:
+     * <ul>
+     *     <li>
+     *         Use {@link org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig}
+     *         (<b>experimental API</b>) if you want to react differently to changes in subsets of the configuration.
+     *     </li>
+     *     <li>
+     *         Use {@link #get(RequestDiagnosticContext, Duration, Duration)} with
+     *         {@link Flux#distinctUntilChanged(Function, BiPredicate)} if you want to specify custom comparison logic.
+     *     </li>
+     * </ul>
+     *
+     * @param diagnosticContext diagnostic context as defined in Logging Guideline
+     * @param initialDelay delay after first request attempt
+     * @param period frequency of update checks
+     * @return stream of configuration updates
+     */
+    default Flux<JsonObject> updates(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) {
+        return get(diagnosticContext, initialDelay, period).distinctUntilChanged();
+    }
 }
index 7b47b12..0bece14 100644 (file)
@@ -22,11 +22,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
 
 import io.vavr.Function1;
 import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.2
  */
+@ExperimentalApi
 @FunctionalInterface
 public interface HashAlgorithm extends Function1<byte[], byte[]> {
 
index b27c718..46c032e 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 
@@ -33,6 +34,7 @@ import reactor.core.publisher.Flux;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.2
  */
+@ExperimentalApi
 public class ListenableCbsConfig {
 
     private MerkleTree<String> tree = MerkleTree.emptyWithDefaultDigest(String::getBytes);
index 7f24b36..837a1ca 100644 (file)
@@ -30,6 +30,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Objects;
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * An immutable <a href="https://en.wikipedia.org/wiki/Merkle_tree" target="_blank">Merkle Tree</a> implementation.
@@ -40,6 +41,7 @@ import org.jetbrains.annotations.NotNull;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.2
  */
+@ExperimentalApi
 public final class MerkleTree<V> {
 
     private static final String DEFAULT_DIGEST_ALGORITHM = "SHA-256";
index 0368c8a..b613098 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
 import io.vavr.collection.List;
 import io.vavr.control.Option;
 import java.util.function.Consumer;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * The listener for changes of the {@link MerkleTree} subtree.
@@ -30,6 +31,7 @@ import java.util.function.Consumer;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.2
  */
+@ExperimentalApi
 @FunctionalInterface
 public interface TreeChangeListener<V> extends Consumer<Option<MerkleTree<V>>> {
 
index 9ef9fe6..b082402 100644 (file)
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
 
 import io.vavr.Function1;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.2
  */
+@ExperimentalApi
 @FunctionalInterface
 public interface ValueSerializer<V> extends Function1<V, byte[]> {
 
index 309bb62..8a0977d 100644 (file)
@@ -98,7 +98,8 @@ class CbsClientImplIT {
         final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
         // when
-        final Flux<JsonObject> result = sut.flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
+        final Flux<JsonObject> result = sut
+                .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
 
         // then
         final int itemsToTake = 5;
@@ -108,6 +109,25 @@ class CbsClientImplIT {
                 .verify(Duration.ofSeconds(5));
     }
 
+    @Test
+    void testCbsClientWithUpdatesCall() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        final Duration period = Duration.ofMillis(10);
+
+        // when
+        final Flux<JsonObject> result = sut
+                .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period));
+
+        // then
+        final Duration timeToCollectItemsFor = period.multipliedBy(50);
+        StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
+                .expectNext(EXPECTED_CONFIG_VALUE)
+                .expectComplete()
+                .verify(Duration.ofSeconds(5));
+    }
+
     private String sampleConfigValue(JsonObject obj) {
         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
     }
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/annotations/ExperimentalApi.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/annotations/ExperimentalApi.java
new file mode 100644 (file)
index 0000000..9b57768
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotate experimental API.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.2
+ */
+@Documented
+@Target({ElementType.METHOD, ElementType.TYPE, ElementType.PACKAGE})
+@Retention(RetentionPolicy.SOURCE)
+public @interface ExperimentalApi {
+    String value() default "Experimental API. Might be deleted or changed in the future SDK versions.";
+    String expectedInVersion() default "unknown";
+}