Initial code import
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / openo / msb / wrapper / consul / cache / ConsulCache.java
1 /**
2 * Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.openo.msb.wrapper.consul.cache;
18
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Function;
21 import com.google.common.collect.ImmutableMap;
22
23 import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
24 import org.openo.msb.wrapper.consul.model.ConsulResponse;
25 import org.openo.msb.wrapper.consul.option.QueryOptions;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.math.BigInteger;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.CopyOnWriteArrayList;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40
41 import static com.google.common.base.Preconditions.checkState;
42
43 /**
44  * A cache structure that can provide an up-to-date read-only
45  * map backed by consul data
46  *
47  * @param <V>
48  */
49 public class ConsulCache<K, V> {
50
51     enum State {latent, starting, started, stopped }
52
53     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
54
55     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
56     private final AtomicReference<ImmutableMap<K, V>> lastResponse = new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of());
57     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
58     private final CountDownLatch initLatch = new CountDownLatch(1);
59     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
60     private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
61
62     private final Function<V, K> keyConversion;
63     private final CallbackConsumer<V> callBackConsumer;
64     private final ConsulResponseCallback<List<V>> responseCallback;
65
66     ConsulCache(
67             Function<V, K> keyConversion,
68             CallbackConsumer<V> callbackConsumer) {
69         this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS);
70     }
71
72     ConsulCache(
73             Function<V, K> keyConversion,
74             CallbackConsumer<V> callbackConsumer,
75             final long backoffDelayQty,
76             final TimeUnit backoffDelayUnit) {
77
78         this.keyConversion = keyConversion;
79         this.callBackConsumer = callbackConsumer;
80
81         this.responseCallback = new ConsulResponseCallback<List<V>>() {
82             @Override
83             public void onComplete(ConsulResponse<List<V>> consulResponse) {
84
85                 if (!isRunning()) {
86                     return;
87                 }
88                 updateIndex(consulResponse);
89                 ImmutableMap<K, V> full = convertToMap(consulResponse);
90
91                 boolean changed = !full.equals(lastResponse.get());
92 //                LOGGER.info("node changed:"+changed+"----"+full);
93                 if (changed) {
94                     // changes
95                     lastResponse.set(full);
96                 }
97
98                 if (changed) {
99                     for (Listener<K, V> l : listeners) {
100                         l.notify(full);
101                     }
102                 }
103
104                 if (state.compareAndSet(State.starting, State.started)) {
105                     initLatch.countDown();
106                 }
107                 runCallback();
108             }
109
110             @Override
111             public void onFailure(Throwable throwable) {
112
113                 if (!isRunning()) {
114                     return;
115                 }
116                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
117
118                 executorService.schedule(new Runnable() {
119                     @Override
120                     public void run() {
121                         runCallback();
122                     }
123                 }, backoffDelayQty, backoffDelayUnit);
124             }
125         };
126     }
127
128     public void start() throws Exception {
129         checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
130         runCallback();
131     }
132
133     public void stop() throws Exception {
134         State previous = state.getAndSet(State.stopped);
135         if (previous != State.stopped) {
136             executorService.shutdownNow();
137         }
138     }
139
140     private void runCallback() {
141         if (isRunning()) {
142             callBackConsumer.consume(latestIndex.get(), responseCallback);
143         }
144     }
145
146     private boolean isRunning() {
147         return state.get() == State.started || state.get() == State.starting;
148     }
149
150     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
151         return initLatch.await(timeout, unit);
152     }
153
154     public ImmutableMap<K, V> getMap() {
155         return lastResponse.get();
156     }
157
158     @VisibleForTesting
159     ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {
160         if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
161             return ImmutableMap.of();
162         }
163
164         final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder();
165         final Set<K> keySet = new HashSet<>();
166         for (final V v : response.getResponse()) {
167             final K key = keyConversion.apply(v);
168             if (key != null) {
169                 if (!keySet.contains(key)) {
170                     builder.put(key, v);
171                 } else {
172                     System.out.println(key.toString());
173                     LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
174                 }
175             }
176             keySet.add(key);
177         }
178         return builder.build();
179     }
180
181     private void updateIndex(ConsulResponse<List<V>> consulResponse) {
182         if (consulResponse != null && consulResponse.getIndex() != null) {
183             this.latestIndex.set(consulResponse.getIndex());
184         }
185     }
186
187     protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
188         if (index == null) {
189             return QueryOptions.BLANK;
190         } else {
191             return QueryOptions.blockSeconds(blockSeconds, index).build();
192         }
193     }
194
195     /**
196      * passed in by creators to vary the content of the cached values
197      *
198      * @param <V>
199      */
200     protected interface CallbackConsumer<V> {
201         void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
202     }
203
204     /**
205      * Implementers can register a listener to receive
206      * a new map when it changes
207      *
208      * @param <V>
209      */
210     public interface Listener<K, V> {
211         void notify(Map<K, V> newValues);
212     }
213
214     public boolean addListener(Listener<K, V> listener) {
215         boolean added = listeners.add(listener);
216         if (state.get() == State.started) {
217             listener.notify(lastResponse.get());
218         }
219         return added;
220     }
221
222     public boolean removeListener(Listener<K, V> listener) {
223         return listeners.remove(listener);
224     }
225
226     @VisibleForTesting
227     protected State getState() {
228         return state.get();
229     }
230 }