Fix java check style issue
[msb/discovery.git] / sdclient / discovery-service / src / main / java / org / onap / msb / sdclient / wrapper / consul / cache / ConsulCache.java
1 /**
2  * Copyright 2016-2017 ZTE, Inc. and others.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 package org.onap.msb.sdclient.wrapper.consul.cache;
15
16 import static com.google.common.base.Preconditions.checkState;
17
18 import java.math.BigInteger;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.onap.msb.sdclient.wrapper.consul.async.ConsulResponseCallback;
31 import org.onap.msb.sdclient.wrapper.consul.model.ConsulResponse;
32 import org.onap.msb.sdclient.wrapper.consul.option.QueryOptions;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.base.Function;
38 import com.google.common.collect.ImmutableMap;
39
40 /**
41  * A cache structure that can provide an up-to-date read-only map backed by consul data
42  *
43  * @param <V>
44  */
45 public class ConsulCache<K, V> {
46
47     enum State {
48         latent, starting, started, stopped
49     }
50
51     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
52
53     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
54     private final AtomicReference<ImmutableMap<K, V>> lastResponse =
55                     new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of());
56     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
57     private final CountDownLatch initLatch = new CountDownLatch(1);
58     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
59     private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
60
61     private final Function<V, K> keyConversion;
62     private final CallbackConsumer<V> callBackConsumer;
63     private final ConsulResponseCallback<List<V>> responseCallback;
64
65     ConsulCache(Function<V, K> keyConversion, CallbackConsumer<V> callbackConsumer) {
66         this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS);
67     }
68
69     ConsulCache(Function<V, K> keyConversion, CallbackConsumer<V> callbackConsumer, final long backoffDelayQty,
70                     final TimeUnit backoffDelayUnit) {
71
72         this.keyConversion = keyConversion;
73         this.callBackConsumer = callbackConsumer;
74
75         this.responseCallback = new ConsulResponseCallback<List<V>>() {
76             @Override
77             public void onComplete(ConsulResponse<List<V>> consulResponse) {
78
79                 if (!isRunning()) {
80                     return;
81                 }
82                 updateIndex(consulResponse);
83                 ImmutableMap<K, V> full = convertToMap(consulResponse);
84
85                 boolean changed = !full.equals(lastResponse.get());
86                 // LOGGER.info("node changed:"+changed+"----"+full);
87                 if (changed) {
88                     // changes
89                     lastResponse.set(full);
90                 }
91
92                 if (changed) {
93                     for (Listener<K, V> l : listeners) {
94                         l.notify(full);
95                     }
96                 }
97
98                 if (state.compareAndSet(State.starting, State.started)) {
99                     initLatch.countDown();
100                 }
101                 runCallback();
102             }
103
104             @Override
105             public void onFailure(Throwable throwable) {
106
107                 if (!isRunning()) {
108                     return;
109                 }
110                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty,
111                                 backoffDelayUnit), throwable);
112
113                 executorService.schedule(new Runnable() {
114                     @Override
115                     public void run() {
116                         runCallback();
117                     }
118                 }, backoffDelayQty, backoffDelayUnit);
119             }
120         };
121     }
122
123     public void start() throws Exception {
124         checkState(state.compareAndSet(State.latent, State.starting), "Cannot transition from state %s to %s",
125                         state.get(), State.starting);
126         runCallback();
127     }
128
129     public void stop() throws Exception {
130         State previous = state.getAndSet(State.stopped);
131         if (previous != State.stopped) {
132             executorService.shutdownNow();
133         }
134     }
135
136     private void runCallback() {
137         if (isRunning()) {
138             callBackConsumer.consume(latestIndex.get(), responseCallback);
139         }
140     }
141
142     private boolean isRunning() {
143         return state.get() == State.started || state.get() == State.starting;
144     }
145
146     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
147         return initLatch.await(timeout, unit);
148     }
149
150     public ImmutableMap<K, V> getMap() {
151         return lastResponse.get();
152     }
153
154     @VisibleForTesting
155     ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {
156         if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
157             return ImmutableMap.of();
158         }
159
160         final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder();
161         final Set<K> keySet = new HashSet<>();
162         for (final V v : response.getResponse()) {
163             final K key = keyConversion.apply(v);
164             if (key != null) {
165                 if (!keySet.contains(key)) {
166                     builder.put(key, v);
167                 } else {
168                     System.out.println(key.toString());
169                     LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? "
170                                     + key.toString());
171                 }
172             }
173             keySet.add(key);
174         }
175         return builder.build();
176     }
177
178     private void updateIndex(ConsulResponse<List<V>> consulResponse) {
179         if (consulResponse != null && consulResponse.getIndex() != null) {
180             this.latestIndex.set(consulResponse.getIndex());
181         }
182     }
183
184     protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
185         if (index == null) {
186             return QueryOptions.BLANK;
187         } else {
188             return QueryOptions.blockSeconds(blockSeconds, index).build();
189         }
190     }
191
192     /**
193      * passed in by creators to vary the content of the cached values
194      *
195      * @param <V>
196      */
197     protected interface CallbackConsumer<V> {
198         void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
199     }
200
201     /**
202      * Implementers can register a listener to receive a new map when it changes
203      *
204      * @param <V>
205      */
206     public interface Listener<K, V> {
207         void notify(Map<K, V> newValues);
208     }
209
210     public boolean addListener(Listener<K, V> listener) {
211         boolean added = listeners.add(listener);
212         if (state.get() == State.started) {
213             listener.notify(lastResponse.get());
214         }
215         return added;
216     }
217
218     public boolean removeListener(Listener<K, V> listener) {
219         return listeners.remove(listener);
220     }
221
222     @VisibleForTesting
223     protected State getState() {
224         return state.get();
225     }
226 }