2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
23 import static org.assertj.core.api.Assertions.assertThat;
25 import io.vavr.collection.List;
26 import java.util.concurrent.atomic.AtomicReference;
27 import org.junit.jupiter.api.Test;
28 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig;
29 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
30 import reactor.core.publisher.Flux;
31 import reactor.core.publisher.ReplayProcessor;
32 import reactor.test.StepVerifier;
35 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
36 * @since February 2019
38 class ListenableCbsConfigTest {
41 void listen_shouldCallListenerAfterEachChange() {
42 ListenableCbsConfig cut = new ListenableCbsConfig();
44 final List<String> expectedChanges = List.of("1", "2", "3");
45 final AtomicReference<List<String>> actualChanges = new AtomicReference<>(List.empty());
47 cut.listen(List.of("some-key"), subtreeOption ->
48 actualChanges.updateAndGet(
49 changes -> changes.append(subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]")))
53 final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
54 .add(List.of("some-key"), "1");
56 final MerkleTree<String> updatedConfig1 = initialConfig
57 .add(List.of("some-key"), "2");
59 final MerkleTree<String> updatedConfig2 = updatedConfig1
60 .add(List.of("some-key"), "3");
62 cut.update(initialConfig);
63 cut.update(updatedConfig1);
64 cut.update(updatedConfig2);
66 assertThat(actualChanges.get()).isEqualTo(expectedChanges);
72 void subtreeChanges_shouldEmitItemOnEachChange() {
73 ListenableCbsConfig cut = new ListenableCbsConfig();
75 final ReplayProcessor<String> replayProcessor = ReplayProcessor.create();
76 final List<String> expectedChanges = List.of("1", "2", "3");
78 cut.subtreeChanges(List.of("some-key"))
80 subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]")
82 .subscribe(replayProcessor);
84 final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
85 .add(List.of("some-key"), "1");
87 final MerkleTree<String> updatedConfig1 = initialConfig
88 .add(List.of("some-key"), "2");
90 final MerkleTree<String> updatedConfig2 = updatedConfig1
91 .add(List.of("some-key"), "3");
93 cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2));
95 StepVerifier.create(replayProcessor.take(expectedChanges.size()))
96 .expectNextSequence(expectedChanges)
102 void subtreeChanges_shouldEmitItemOnEachActualChangeAndWhenNodeHasBeenRemoved() {
103 ListenableCbsConfig cut = new ListenableCbsConfig();
105 final ReplayProcessor<String> actualChanges = ReplayProcessor.create();
106 final List<String> expectedChanges = List.of("http://dmaap/topic1", "http://dmaap/topic1-updated", "[None]");
108 cut.subtreeChanges(List.of("streams", "publishes"))
109 .map(subtreeOption ->
110 subtreeOption.flatMap(subtree -> subtree.get("topic1", "dmaap-url")).getOrElse("[None]")
112 .subscribe(actualChanges);
114 final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
115 .add(List.of("collector", "treshold"), "145")
116 .add(List.of("collector", "listenPort"), "8080");
118 final MerkleTree<String> updatedConfig1 = initialConfig
119 .add(List.of("streams", "publishes", "topic1", "type"), "message-bus")
120 .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1");
122 final MerkleTree<String> updatedConfig2 = updatedConfig1
123 .add(List.of("streams", "publishes", "topic1", "type"), "message-bus")
124 .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1-updated");
126 final MerkleTree<String> updatedConfig3 = updatedConfig2
127 .add(List.of("collector", "treshold"), "1410");
129 final MerkleTree<String> updatedConfig4 = initialConfig;
131 cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2, updatedConfig3, updatedConfig4));
133 StepVerifier.create(actualChanges.take(expectedChanges.size()))
134 .expectNextSequence(expectedChanges)