5f431f1a59dee73f541bbe39f254052f2a3d58c9
[msb/apigateway.git] / msb-core / apiroute / apiroute-service / src / main / java / org / openo / msb / wrapper / consul / cache / ConsulCache.java
1 /**
2  * Copyright 2016 ZTE, Inc. and others.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 /**
17 * Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
18 *
19 * Licensed under the Apache License, Version 2.0 (the "License");
20 * you may not use this file except in compliance with the License.
21 * You may obtain a copy of the License at
22 *
23 * http://www.apache.org/licenses/LICENSE-2.0
24 *
25 * Unless required by applicable law or agreed to in writing, software
26 * distributed under the License is distributed on an "AS IS" BASIS,
27 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28 * See the License for the specific language governing permissions and
29 * limitations under the License.
30 */
31
32 package org.openo.msb.wrapper.consul.cache;
33
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Function;
36 import com.google.common.collect.ImmutableMap;
37
38 import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
39 import org.openo.msb.wrapper.consul.model.ConsulResponse;
40 import org.openo.msb.wrapper.consul.option.QueryOptions;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.math.BigInteger;
45 import java.util.HashSet;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.Set;
49 import java.util.concurrent.CopyOnWriteArrayList;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.Executors;
52 import java.util.concurrent.ScheduledExecutorService;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.atomic.AtomicReference;
55
56 import static com.google.common.base.Preconditions.checkState;
57
58 /**
59  * A cache structure that can provide an up-to-date read-only
60  * map backed by consul data
61  *
62  * @param <V>
63  */
64 public class ConsulCache<K, V> {
65
66     enum State {latent, starting, started, stopped }
67
68     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
69
70     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
71     private final AtomicReference<ImmutableMap<K, V>> lastResponse = new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of());
72     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
73     private final CountDownLatch initLatch = new CountDownLatch(1);
74     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
75     private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
76
77     private final Function<V, K> keyConversion;
78     private final CallbackConsumer<V> callBackConsumer;
79     private final ConsulResponseCallback<List<V>> responseCallback;
80
81     ConsulCache(
82             Function<V, K> keyConversion,
83             CallbackConsumer<V> callbackConsumer) {
84         this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS);
85     }
86
87     ConsulCache(
88             Function<V, K> keyConversion,
89             CallbackConsumer<V> callbackConsumer,
90             final long backoffDelayQty,
91             final TimeUnit backoffDelayUnit) {
92
93         this.keyConversion = keyConversion;
94         this.callBackConsumer = callbackConsumer;
95
96         this.responseCallback = new ConsulResponseCallback<List<V>>() {
97             @Override
98             public void onComplete(ConsulResponse<List<V>> consulResponse) {
99
100                 if (!isRunning()) {
101                     return;
102                 }
103                 updateIndex(consulResponse);
104                 ImmutableMap<K, V> full = convertToMap(consulResponse);
105
106                 boolean changed = !full.equals(lastResponse.get());
107 //                LOGGER.info("node changed:"+changed+"----"+full);
108                 if (changed) {
109                     // changes
110                     lastResponse.set(full);
111                 }
112
113                 if (changed) {
114                     for (Listener<K, V> l : listeners) {
115                         l.notify(full);
116                     }
117                 }
118
119                 if (state.compareAndSet(State.starting, State.started)) {
120                     initLatch.countDown();
121                 }
122                 runCallback();
123             }
124
125             @Override
126             public void onFailure(Throwable throwable) {
127
128                 if (!isRunning()) {
129                     return;
130                 }
131                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
132
133                 executorService.schedule(new Runnable() {
134                     @Override
135                     public void run() {
136                         runCallback();
137                     }
138                 }, backoffDelayQty, backoffDelayUnit);
139             }
140         };
141     }
142
143     public void start() throws Exception {
144         checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
145         runCallback();
146     }
147
148     public void stop() throws Exception {
149         State previous = state.getAndSet(State.stopped);
150         if (previous != State.stopped) {
151             executorService.shutdownNow();
152         }
153     }
154
155     private void runCallback() {
156         if (isRunning()) {
157             callBackConsumer.consume(latestIndex.get(), responseCallback);
158         }
159     }
160
161     private boolean isRunning() {
162         return state.get() == State.started || state.get() == State.starting;
163     }
164
165     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
166         return initLatch.await(timeout, unit);
167     }
168
169     public ImmutableMap<K, V> getMap() {
170         return lastResponse.get();
171     }
172
173     @VisibleForTesting
174     ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {
175         if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
176             return ImmutableMap.of();
177         }
178
179         final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder();
180         final Set<K> keySet = new HashSet<>();
181         for (final V v : response.getResponse()) {
182             final K key = keyConversion.apply(v);
183             if (key != null) {
184                 if (!keySet.contains(key)) {
185                     builder.put(key, v);
186                 } else {
187                     System.out.println(key.toString());
188                     LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
189                 }
190             }
191             keySet.add(key);
192         }
193         return builder.build();
194     }
195
196     private void updateIndex(ConsulResponse<List<V>> consulResponse) {
197         if (consulResponse != null && consulResponse.getIndex() != null) {
198             this.latestIndex.set(consulResponse.getIndex());
199         }
200     }
201
202     protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
203         if (index == null) {
204             return QueryOptions.BLANK;
205         } else {
206             return QueryOptions.blockSeconds(blockSeconds, index).build();
207         }
208     }
209
210     /**
211      * passed in by creators to vary the content of the cached values
212      *
213      * @param <V>
214      */
215     protected interface CallbackConsumer<V> {
216         void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
217     }
218
219     /**
220      * Implementers can register a listener to receive
221      * a new map when it changes
222      *
223      * @param <V>
224      */
225     public interface Listener<K, V> {
226         void notify(Map<K, V> newValues);
227     }
228
229     public boolean addListener(Listener<K, V> listener) {
230         boolean added = listeners.add(listener);
231         if (state.get() == State.started) {
232             listener.notify(lastResponse.get());
233         }
234         return added;
235     }
236
237     public boolean removeListener(Listener<K, V> listener) {
238         return listeners.remove(listener);
239     }
240
241     @VisibleForTesting
242     protected State getState() {
243         return state.get();
244     }
245 }