remove not required docs and .readthedocs.yaml
[msb/discovery.git] / sdclient / discovery-service / src / main / java / org / onap / msb / sdclient / wrapper / consul / cache / ConsulCache.java
1 /**
2  * Copyright 2016-2017 ZTE, Inc. and others.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 package org.onap.msb.sdclient.wrapper.consul.cache;
15
16 import static com.google.common.base.Preconditions.checkState;
17
18 import java.math.BigInteger;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.onap.msb.sdclient.wrapper.consul.async.ConsulResponseCallback;
31 import org.onap.msb.sdclient.wrapper.consul.model.ConsulResponse;
32 import org.onap.msb.sdclient.wrapper.consul.option.QueryOptions;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.base.Function;
38 import com.google.common.collect.ImmutableMap;
39
40 /**
41  * A cache structure that can provide an up-to-date read-only map backed by consul data
42  *
43  * @param <V>
44  */
45 public class ConsulCache<K, V> {
46
47     enum State {
48         latent, starting, started, stopped
49     }
50
51     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
52
53     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
54     private final AtomicReference<ImmutableMap<K, V>> lastResponse =
55                     new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of());
56     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
57     private final CountDownLatch initLatch = new CountDownLatch(1);
58     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
59     private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
60
61     private final Function<V, K> keyConversion;
62     private final CallbackConsumer<V> callBackConsumer;
63     private final ConsulResponseCallback<List<V>> responseCallback;
64
65     ConsulCache(Function<V, K> keyConversion, CallbackConsumer<V> callbackConsumer) {
66         this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS);
67     }
68
69     ConsulCache(Function<V, K> keyConversion, CallbackConsumer<V> callbackConsumer, final long backoffDelayQty,
70                     final TimeUnit backoffDelayUnit) {
71
72         this.keyConversion = keyConversion;
73         this.callBackConsumer = callbackConsumer;
74
75         this.responseCallback = new ConsulResponseCallback<List<V>>() {
76             @Override
77             public void onComplete(ConsulResponse<List<V>> consulResponse) {
78
79                 if (!isRunning()) {
80                     return;
81                 }
82                 updateIndex(consulResponse);
83                 ImmutableMap<K, V> full = convertToMap(consulResponse);
84
85                 boolean changed = !full.equals(lastResponse.get());
86                 // LOGGER.info("node changed:"+changed+"----"+full);
87                 if (changed) {
88                     // changes
89                     lastResponse.set(full);
90                 }
91
92                 if (changed) {
93                     for (Listener<K, V> l : listeners) {
94                         l.notify(full);
95                     }
96                 }
97
98                 if (state.compareAndSet(State.starting, State.started)) {
99                     initLatch.countDown();
100                 }
101                 runCallback();
102             }
103
104             @Override
105             public void onFailure(Throwable throwable) {
106
107                 if (!isRunning()) {
108                     return;
109                 }
110                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty,
111                                 backoffDelayUnit), throwable);
112
113                 executorService.schedule(ConsulCache.this::runCallback, backoffDelayQty, backoffDelayUnit);
114             }
115         };
116     }
117
118     public void start() throws Exception {
119         checkState(state.compareAndSet(State.latent, State.starting), "Cannot transition from state %s to %s",
120                         state.get(), State.starting);
121         runCallback();
122     }
123
124     public void stop() throws Exception {
125         State previous = state.getAndSet(State.stopped);
126         if (previous != State.stopped) {
127             executorService.shutdownNow();
128         }
129     }
130
131     private void runCallback() {
132         if (isRunning()) {
133             callBackConsumer.consume(latestIndex.get(), responseCallback);
134         }
135     }
136
137     private boolean isRunning() {
138         return state.get() == State.started || state.get() == State.starting;
139     }
140
141     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
142         return initLatch.await(timeout, unit);
143     }
144
145     public ImmutableMap<K, V> getMap() {
146         return lastResponse.get();
147     }
148
149     @VisibleForTesting
150     ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {
151         if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
152             return ImmutableMap.of();
153         }
154
155         final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder();
156         final Set<K> keySet = new HashSet<>();
157         for (final V v : response.getResponse()) {
158             final K key = keyConversion.apply(v);
159             if (key != null) {
160                 if (!keySet.contains(key)) {
161                     builder.put(key, v);
162                 } else {
163                     System.out.println(key.toString());
164                     LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? "
165                                     + key.toString());
166                 }
167             }
168             keySet.add(key);
169         }
170         return builder.build();
171     }
172
173     private void updateIndex(ConsulResponse<List<V>> consulResponse) {
174         if (consulResponse != null && consulResponse.getIndex() != null) {
175             this.latestIndex.set(consulResponse.getIndex());
176         }
177     }
178
179     protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
180         if (index == null) {
181             return QueryOptions.BLANK;
182         } else {
183             return QueryOptions.blockSeconds(blockSeconds, index).build();
184         }
185     }
186
187     /**
188      * passed in by creators to vary the content of the cached values
189      *
190      * @param <V>
191      */
192     protected interface CallbackConsumer<V> {
193         void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
194     }
195
196     /**
197      * Implementers can register a listener to receive a new map when it changes
198      *
199      * @param <V>
200      */
201     public interface Listener<K, V> {
202         void notify(Map<K, V> newValues);
203     }
204
205     public boolean addListener(Listener<K, V> listener) {
206         boolean added = listeners.add(listener);
207         if (state.get() == State.started) {
208             listener.notify(lastResponse.get());
209         }
210         return added;
211     }
212
213     public boolean removeListener(Listener<K, V> listener) {
214         return listeners.remove(listener);
215     }
216
217     @VisibleForTesting
218     protected State getState() {
219         return state.get();
220     }
221 }