Divide the MSB source codes into two repos
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / wrapper / consulextend / cache / ConsulCache.java
1 package org.onap.msb.apiroute.wrapper.consulextend.cache;
2
3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState;
5
6 import java.math.BigInteger;
7 import java.util.Collections;
8 import java.util.List;
9 import java.util.Properties;
10 import java.util.concurrent.CopyOnWriteArrayList;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicReference;
16
17 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback;
18 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader;
19 import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse;
20 import org.onap.msb.apiroute.wrapper.consulextend.util.Http;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import com.google.common.annotations.VisibleForTesting;
25 import com.google.common.base.Strings;
26 import com.orbitz.consul.ConsulException;
27 import com.orbitz.consul.model.ConsulResponse;
28 import com.orbitz.consul.option.ImmutableQueryOptions;
29 import com.orbitz.consul.option.QueryOptions;
30
31 /**
32  * A cache structure that can provide an up-to-date read-only map backed by
33  * consul data
34  * 
35  * @param <V>
36  */
37 public class ConsulCache<T> {
38
39         enum State {
40                 latent, starting, started, stopped
41         }
42
43         private final static Logger LOGGER = LoggerFactory
44                         .getLogger(ConsulCache.class);
45
46         @VisibleForTesting
47         static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay";
48         private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System
49                         .getProperties());
50
51         private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(
52                         null);
53         private final AtomicReference<State> state = new AtomicReference<State>(
54                         State.latent);
55         private final CountDownLatch initLatch = new CountDownLatch(1);
56         private final ScheduledExecutorService executorService = Executors
57                         .newSingleThreadScheduledExecutor();
58         private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>();
59
60         private final CallbackConsumer<T> callBackConsumer;
61         private final ConsulResponseCallback<T> responseCallback;
62
63         ConsulCache(CallbackConsumer<T> callbackConsumer) {
64
65                 this.callBackConsumer = callbackConsumer;
66
67                 this.responseCallback = new ConsulResponseCallback<T>() {
68                         @Override
69                         public void onComplete(ConsulResponse<T> consulResponse) {
70
71                                 if (consulResponse.isKnownLeader()) {
72                                         if (!isRunning()) {
73                                                 return;
74                                         }
75                                         updateIndex(consulResponse);
76
77                                         for (Listener<T> l : listeners) {
78                                                 l.notify(consulResponse);
79                                         }
80
81                                         if (state.compareAndSet(State.starting, State.started)) {
82                                                 initLatch.countDown();
83                                         }
84
85                                         runCallback();
86                                 } else {
87                                         onFailure(new ConsulException(
88                                                         "Consul cluster has no elected leader"));
89                                 }
90                         }
91
92                         @Override
93                         public void onDelayComplete(
94                                         OriginalConsulResponse<T> originalConsulResponse) {
95
96                                 try {
97                                         // get header
98                                         ConsulResponseHeader consulResponseHeader = Http
99                                                         .consulResponseHeader(originalConsulResponse
100                                                                         .getResponse());
101
102                                         if (consulResponseHeader.isKnownLeader()) {
103                                                 if (!isRunning()) {
104                                                         return;
105                                                 }
106
107                                                 boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader
108                                                                 .getIndex());
109                                                 // consul index different
110                                                 if (isConuslIndexChanged) {
111
112                                                         updateIndex(consulResponseHeader.getIndex());
113
114                                                         // get T type data
115                                                         ConsulResponse<T> consulResponse = Http
116                                                                         .consulResponse(originalConsulResponse
117                                                                                         .getResponseType(),
118                                                                                         originalConsulResponse
119                                                                                                         .getResponse());
120
121                                                         // notify customer to custom T data
122                                                         for (Listener<T> l : listeners) {
123                                                                 l.notify(consulResponse);
124                                                         }
125                                                 }
126
127                                                 if (state.compareAndSet(State.starting, State.started)) {
128                                                         initLatch.countDown();
129                                                 }
130
131                                                 runCallback();
132
133                                         } else {
134                                                 onFailure(new ConsulException(
135                                                                 "Consul cluster has no elected leader"));
136                                         }
137                                 } catch (Exception e) {
138                                         onFailure(e);
139                                 }
140
141                         }
142
143                         @Override
144                         public void onFailure(Throwable throwable) {
145
146                                 if (!isRunning()) {
147                                         return;
148                                 }
149                                 LOGGER.error(
150                                                 String.format(
151                                                                 "Error getting response from consul. will retry in %d %s",
152                                                                 BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS),
153                                                 throwable);
154
155                                 executorService.schedule(new Runnable() {
156                                         @Override
157                                         public void run() {
158                                                 runCallback();
159                                         }
160                                 }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS);
161                         }
162                 };
163         }
164
165         @VisibleForTesting
166         static long getBackOffDelayInMs(Properties properties) {
167                 String backOffDelay = null;
168                 try {
169                         backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY);
170                         if (!Strings.isNullOrEmpty(backOffDelay)) {
171                                 return Long.parseLong(backOffDelay);
172                         }
173                 } catch (Exception ex) {
174                         LOGGER.warn(
175                                         backOffDelay != null ? String.format(
176                                                         "Error parsing property variable %s: %s",
177                                                         BACKOFF_DELAY_PROPERTY, backOffDelay) : String
178                                                         .format("Error extracting property variable %s",
179                                                                         BACKOFF_DELAY_PROPERTY), ex);
180                 }
181                 return TimeUnit.SECONDS.toMillis(10);
182         }
183
184         public void start() throws Exception {
185                 checkState(state.compareAndSet(State.latent, State.starting),
186                                 "Cannot transition from state %s to %s", state.get(),
187                                 State.starting);
188                 runCallback();
189         }
190
191         public void stop() throws Exception {
192                 State previous = state.getAndSet(State.stopped);
193                 if (previous != State.stopped) {
194                         executorService.shutdownNow();
195                 }
196         }
197
198         private void runCallback() {
199                 if (isRunning()) {
200                         callBackConsumer.consume(latestIndex.get(), responseCallback);
201                 }
202         }
203
204         private boolean isRunning() {
205                 return state.get() == State.started || state.get() == State.starting;
206         }
207
208         public boolean awaitInitialized(long timeout, TimeUnit unit)
209                         throws InterruptedException {
210                 return initLatch.await(timeout, unit);
211         }
212
213         private void updateIndex(ConsulResponse<T> consulResponse) {
214                 if (consulResponse != null && consulResponse.getIndex() != null) {
215                         this.latestIndex.set(consulResponse.getIndex());
216                 }
217         }
218
219         public void updateIndex(BigInteger index) {
220                 if (index != null) {
221                         this.latestIndex.set(index);
222                 }
223         }
224
225         protected static QueryOptions watchParams(final BigInteger index,
226                         final int blockSeconds, QueryOptions queryOptions) {
227                 checkArgument(!queryOptions.getIndex().isPresent()
228                                 && !queryOptions.getWait().isPresent(),
229                                 "Index and wait cannot be overridden");
230
231                 return ImmutableQueryOptions.builder()
232                                 .from(watchDefaultParams(index, blockSeconds))
233                                 .token(queryOptions.getToken())
234                                 .consistencyMode(queryOptions.getConsistencyMode())
235                                 .near(queryOptions.getNear()).build();
236         }
237
238         private static QueryOptions watchDefaultParams(final BigInteger index,
239                         final int blockSeconds) {
240                 if (index == null) {
241                         return QueryOptions.BLANK;
242                 } else {
243                         return QueryOptions.blockSeconds(blockSeconds, index).build();
244                 }
245         }
246
247         /**
248          * passed in by creators to vary the content of the cached values
249          * 
250          * @param <V>
251          */
252         protected interface CallbackConsumer<T> {
253                 void consume(BigInteger index, ConsulResponseCallback<T> callback);
254         }
255
256         /**
257          * Implementers can register a listener to receive a new map when it changes
258          * 
259          * @param <V>
260          */
261         public interface Listener<T> {
262                 void notify(ConsulResponse<T> newValues);
263         }
264
265         public boolean addListener(Listener<T> listener) {
266                 boolean added = listeners.add(listener);
267                 return added;
268         }
269
270         public List<Listener<T>> getListeners() {
271                 return Collections.unmodifiableList(listeners);
272         }
273
274         public boolean removeListener(Listener<T> listener) {
275                 return listeners.remove(listener);
276         }
277
278         @VisibleForTesting
279         protected State getState() {
280                 return state.get();
281         }
282
283         private boolean isConuslIndexChanged(final BigInteger index) {
284
285                 if (index != null && !index.equals(latestIndex.get())) {
286
287                         if (LOGGER.isDebugEnabled()) {
288                                 // 第一次不打印
289                                 if (latestIndex.get() != null) {
290                                         LOGGER.debug("consul index compare:new-" + index + "  old-"
291                                                         + latestIndex.get());
292                                 }
293
294                         }
295
296                         return true;
297                 }
298
299                 return false;
300         }
301 }