inherit from oparent
[msb/discovery.git] / sdclient / discovery-service / src / main / java / org / onap / msb / sdclient / wrapper / consul / cache / ConsulCache.java
1 /**
2  * Copyright 2016 ZTE, Inc. and others.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.msb.sdclient.wrapper.consul.cache;
17
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Function;
20 import com.google.common.collect.ImmutableMap;
21
22 import org.onap.msb.sdclient.wrapper.consul.async.ConsulResponseCallback;
23 import org.onap.msb.sdclient.wrapper.consul.model.ConsulResponse;
24 import org.onap.msb.sdclient.wrapper.consul.option.QueryOptions;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import java.math.BigInteger;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.CopyOnWriteArrayList;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import static com.google.common.base.Preconditions.checkState;
41
42 /**
43  * A cache structure that can provide an up-to-date read-only
44  * map backed by consul data
45  *
46  * @param <V>
47  */
48 public class ConsulCache<K, V> {
49
50     enum State {latent, starting, started, stopped }
51
52     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
53
54     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
55     private final AtomicReference<ImmutableMap<K, V>> lastResponse = new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of());
56     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
57     private final CountDownLatch initLatch = new CountDownLatch(1);
58     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
59     private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
60
61     private final Function<V, K> keyConversion;
62     private final CallbackConsumer<V> callBackConsumer;
63     private final ConsulResponseCallback<List<V>> responseCallback;
64
65     ConsulCache(
66             Function<V, K> keyConversion,
67             CallbackConsumer<V> callbackConsumer) {
68         this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS);
69     }
70
71     ConsulCache(
72             Function<V, K> keyConversion,
73             CallbackConsumer<V> callbackConsumer,
74             final long backoffDelayQty,
75             final TimeUnit backoffDelayUnit) {
76
77         this.keyConversion = keyConversion;
78         this.callBackConsumer = callbackConsumer;
79
80         this.responseCallback = new ConsulResponseCallback<List<V>>() {
81             @Override
82             public void onComplete(ConsulResponse<List<V>> consulResponse) {
83
84                 if (!isRunning()) {
85                     return;
86                 }
87                 updateIndex(consulResponse);
88                 ImmutableMap<K, V> full = convertToMap(consulResponse);
89
90                 boolean changed = !full.equals(lastResponse.get());
91 //                LOGGER.info("node changed:"+changed+"----"+full);
92                 if (changed) {
93                     // changes
94                     lastResponse.set(full);
95                 }
96
97                 if (changed) {
98                     for (Listener<K, V> l : listeners) {
99                         l.notify(full);
100                     }
101                 }
102
103                 if (state.compareAndSet(State.starting, State.started)) {
104                     initLatch.countDown();
105                 }
106                 runCallback();
107             }
108
109             @Override
110             public void onFailure(Throwable throwable) {
111
112                 if (!isRunning()) {
113                     return;
114                 }
115                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
116
117                 executorService.schedule(new Runnable() {
118                     @Override
119                     public void run() {
120                         runCallback();
121                     }
122                 }, backoffDelayQty, backoffDelayUnit);
123             }
124         };
125     }
126
127     public void start() throws Exception {
128         checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
129         runCallback();
130     }
131
132     public void stop() throws Exception {
133         State previous = state.getAndSet(State.stopped);
134         if (previous != State.stopped) {
135             executorService.shutdownNow();
136         }
137     }
138
139     private void runCallback() {
140         if (isRunning()) {
141             callBackConsumer.consume(latestIndex.get(), responseCallback);
142         }
143     }
144
145     private boolean isRunning() {
146         return state.get() == State.started || state.get() == State.starting;
147     }
148
149     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
150         return initLatch.await(timeout, unit);
151     }
152
153     public ImmutableMap<K, V> getMap() {
154         return lastResponse.get();
155     }
156
157     @VisibleForTesting
158     ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {
159         if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
160             return ImmutableMap.of();
161         }
162
163         final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder();
164         final Set<K> keySet = new HashSet<>();
165         for (final V v : response.getResponse()) {
166             final K key = keyConversion.apply(v);
167             if (key != null) {
168                 if (!keySet.contains(key)) {
169                     builder.put(key, v);
170                 } else {
171                     System.out.println(key.toString());
172                     LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
173                 }
174             }
175             keySet.add(key);
176         }
177         return builder.build();
178     }
179
180     private void updateIndex(ConsulResponse<List<V>> consulResponse) {
181         if (consulResponse != null && consulResponse.getIndex() != null) {
182             this.latestIndex.set(consulResponse.getIndex());
183         }
184     }
185
186     protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
187         if (index == null) {
188             return QueryOptions.BLANK;
189         } else {
190             return QueryOptions.blockSeconds(blockSeconds, index).build();
191         }
192     }
193
194     /**
195      * passed in by creators to vary the content of the cached values
196      *
197      * @param <V>
198      */
199     protected interface CallbackConsumer<V> {
200         void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
201     }
202
203     /**
204      * Implementers can register a listener to receive
205      * a new map when it changes
206      *
207      * @param <V>
208      */
209     public interface Listener<K, V> {
210         void notify(Map<K, V> newValues);
211     }
212
213     public boolean addListener(Listener<K, V> listener) {
214         boolean added = listeners.add(listener);
215         if (state.get() == State.started) {
216             listener.notify(lastResponse.get());
217         }
218         return added;
219     }
220
221     public boolean removeListener(Listener<K, V> listener) {
222         return listeners.remove(listener);
223     }
224
225     @VisibleForTesting
226     protected State getState() {
227         return state.get();
228     }
229 }