PoC of change-based configuration using MerkleTree
[dcaegen2/services/sdk.git] / rest-services / cbs-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / cbs / client / api / listener / ListenableCbsConfigTest.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24
25 import io.vavr.collection.List;
26 import java.util.concurrent.atomic.AtomicReference;
27 import org.junit.jupiter.api.Test;
28 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig;
29 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
30 import reactor.core.publisher.Flux;
31 import reactor.core.publisher.ReplayProcessor;
32 import reactor.test.StepVerifier;
33
34 /**
35  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
36  * @since February 2019
37  */
38 class ListenableCbsConfigTest {
39
40     @Test
41     void listen_shouldCallListenerAfterEachChange() {
42         ListenableCbsConfig cut = new ListenableCbsConfig();
43
44         final List<String> expectedChanges = List.of("1", "2", "3");
45         final AtomicReference<List<String>> actualChanges = new AtomicReference<>(List.empty());
46
47         cut.listen(List.of("some-key"), subtreeOption ->
48                 actualChanges.updateAndGet(
49                         changes -> changes.append(subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]")))
50
51         );
52
53         final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
54                 .add(List.of("some-key"), "1");
55
56         final MerkleTree<String> updatedConfig1 = initialConfig
57                 .add(List.of("some-key"), "2");
58
59         final MerkleTree<String> updatedConfig2 = updatedConfig1
60                 .add(List.of("some-key"), "3");
61
62         cut.update(initialConfig);
63         cut.update(updatedConfig1);
64         cut.update(updatedConfig2);
65
66         assertThat(actualChanges.get()).isEqualTo(expectedChanges);
67
68     }
69
70
71     @Test
72     void subtreeChanges_shouldEmitItemOnEachChange() {
73         ListenableCbsConfig cut = new ListenableCbsConfig();
74
75         final ReplayProcessor<String> replayProcessor = ReplayProcessor.create();
76         final List<String> expectedChanges = List.of("1", "2", "3");
77
78         cut.subtreeChanges(List.of("some-key"))
79                 .map(subtreeOption ->
80                         subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]")
81                 )
82                 .subscribe(replayProcessor);
83
84         final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
85                 .add(List.of("some-key"), "1");
86
87         final MerkleTree<String> updatedConfig1 = initialConfig
88                 .add(List.of("some-key"), "2");
89
90         final MerkleTree<String> updatedConfig2 = updatedConfig1
91                 .add(List.of("some-key"), "3");
92
93         cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2));
94
95         StepVerifier.create(replayProcessor.take(expectedChanges.size()))
96                 .expectNextSequence(expectedChanges)
97                 .verifyComplete();
98
99     }
100
101     @Test
102     void subtreeChanges_shouldEmitItemOnEachActualChangeAndWhenNodeHasBeenRemoved() {
103         ListenableCbsConfig cut = new ListenableCbsConfig();
104
105         final ReplayProcessor<String> actualChanges = ReplayProcessor.create();
106         final List<String> expectedChanges = List.of("http://dmaap/topic1", "http://dmaap/topic1-updated", "[None]");
107
108         cut.subtreeChanges(List.of("streams", "publishes"))
109                 .map(subtreeOption ->
110                         subtreeOption.flatMap(subtree -> subtree.get("topic1", "dmaap-url")).getOrElse("[None]")
111                 )
112                 .subscribe(actualChanges);
113
114         final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
115                 .add(List.of("collector", "treshold"), "145")
116                 .add(List.of("collector", "listenPort"), "8080");
117
118         final MerkleTree<String> updatedConfig1 = initialConfig
119                 .add(List.of("streams", "publishes", "topic1", "type"), "message-bus")
120                 .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1");
121
122         final MerkleTree<String> updatedConfig2 = updatedConfig1
123                 .add(List.of("streams", "publishes", "topic1", "type"), "message-bus")
124                 .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1-updated");
125
126         final MerkleTree<String> updatedConfig3 = updatedConfig2
127                 .add(List.of("collector", "treshold"), "1410");
128
129         final MerkleTree<String> updatedConfig4 = initialConfig;
130
131         cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2, updatedConfig3, updatedConfig4));
132
133         StepVerifier.create(actualChanges.take(expectedChanges.size()))
134                 .expectNextSequence(expectedChanges)
135                 .verifyComplete();
136
137     }
138 }