b27c718e0bd0dcde76b15c004102cae698262b86
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
22
23 import io.vavr.collection.List;
24 import io.vavr.control.Option;
25 import java.util.Collection;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Map;
29 import reactor.core.Disposable;
30 import reactor.core.publisher.Flux;
31
32 /**
33  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
34  * @since 1.1.2
35  */
36 public class ListenableCbsConfig {
37
38     private MerkleTree<String> tree = MerkleTree.emptyWithDefaultDigest(String::getBytes);
39     private final Map<List<String>, CompositeTreeChangeListener<String>> pathListeners = new HashMap<>();
40     private final Object listenersUpdateMonitor = new Object();
41     private final Object treeUpdateMonitor = new Object();
42
43     public void listen(List<String> path, TreeChangeListener<String> listener) {
44         synchronized (listenersUpdateMonitor) {
45             CompositeTreeChangeListener<String> compositeListener = pathListeners
46                     .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>());
47             compositeListener.addListener(listener);
48         }
49     }
50
51     public void cancel(List<String> path, TreeChangeListener<String> listener) {
52         synchronized (listenersUpdateMonitor) {
53             CompositeTreeChangeListener<String> compositeListener = pathListeners.get(path);
54             if (compositeListener != null) {
55                 compositeListener.removeListener(listener);
56             }
57         }
58     }
59
60     public Flux<Option<MerkleTree<String>>> subtreeChanges(List<String> path) {
61         return Flux.create(sink -> {
62             final TreeChangeListener<String> listener = sink::next;
63             sink.onDispose(() -> cancel(path, listener));
64             listen(path, listener);
65         });
66     }
67
68     public Disposable subscribeForUpdates(Flux<MerkleTree<String>> updates) {
69         return updates.subscribe(this::update);
70     }
71
72     public void update(MerkleTree<String> newTree) {
73         final MerkleTree<String> oldTree;
74         synchronized (treeUpdateMonitor) {
75             oldTree = tree;
76             tree = newTree;
77         }
78
79         for (Map.Entry<List<String>, CompositeTreeChangeListener<String>> entry : pathListeners.entrySet()) {
80             final List<String> path = entry.getKey();
81             final CompositeTreeChangeListener<String> listeners = entry.getValue();
82             if (!newTree.isSame(path, oldTree)) {
83                 listeners.accept(newTree, path);
84             }
85         }
86     }
87
88     private static class CompositeTreeChangeListener<V> implements TreeChangeListener<V> {
89
90         private final Collection<TreeChangeListener<V>> listeners = new HashSet<>();
91
92         void addListener(TreeChangeListener<V> listener) {
93             listeners.add(listener);
94         }
95
96         void removeListener(TreeChangeListener<V> listener) {
97             listeners.remove(listener);
98         }
99
100         @Override
101         public void accept(Option<MerkleTree<V>> updatedSubtree) {
102             for (TreeChangeListener<V> listener : listeners) {
103                 listener.accept(updatedSubtree);
104             }
105         }
106     }
107 }