Fix java check style warning
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / wrapper / consulextend / cache / ConsulCache.java
1 /*******************************************************************************
2  * Copyright 2016-2017 ZTE, Inc. and others.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  * 
7  * http://www.apache.org/licenses/LICENSE-2.0
8  * 
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  ******************************************************************************/
14 package org.onap.msb.apiroute.wrapper.consulextend.cache;
15
16 import static com.google.common.base.Preconditions.checkArgument;
17 import static com.google.common.base.Preconditions.checkState;
18
19 import java.math.BigInteger;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Properties;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback;
31 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader;
32 import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse;
33 import org.onap.msb.apiroute.wrapper.consulextend.util.Http;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.annotations.VisibleForTesting;
38 import com.google.common.base.Strings;
39 import com.orbitz.consul.ConsulException;
40 import com.orbitz.consul.model.ConsulResponse;
41 import com.orbitz.consul.option.ImmutableQueryOptions;
42 import com.orbitz.consul.option.QueryOptions;
43
44 /**
45  * A cache structure that can provide an up-to-date read-only map backed by consul data
46  * 
47  * @param <V>
48  */
49 public class ConsulCache<T> {
50
51     enum State {
52         latent, starting, started, stopped
53     }
54
55     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
56
57     @VisibleForTesting
58     static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay";
59     private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System.getProperties());
60
61     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
62     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
63     private final CountDownLatch initLatch = new CountDownLatch(1);
64     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
65     private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>();
66
67     private final CallbackConsumer<T> callBackConsumer;
68     private final ConsulResponseCallback<T> responseCallback;
69
70     ConsulCache(CallbackConsumer<T> callbackConsumer) {
71
72         this.callBackConsumer = callbackConsumer;
73
74         this.responseCallback = new ConsulResponseCallback<T>() {
75             @Override
76             public void onComplete(ConsulResponse<T> consulResponse) {
77
78                 if (consulResponse.isKnownLeader()) {
79                     if (!isRunning()) {
80                         return;
81                     }
82                     updateIndex(consulResponse);
83
84                     for (Listener<T> l : listeners) {
85                         l.notify(consulResponse);
86                     }
87
88                     if (state.compareAndSet(State.starting, State.started)) {
89                         initLatch.countDown();
90                     }
91
92                     runCallback();
93                 } else {
94                     onFailure(new ConsulException("Consul cluster has no elected leader"));
95                 }
96             }
97
98             @Override
99             public void onDelayComplete(OriginalConsulResponse<T> originalConsulResponse) {
100
101                 try {
102                     // get header
103                     ConsulResponseHeader consulResponseHeader =
104                                     Http.consulResponseHeader(originalConsulResponse.getResponse());
105
106                     if (consulResponseHeader.isKnownLeader()) {
107                         if (!isRunning()) {
108                             return;
109                         }
110
111                         boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader.getIndex());
112                         // consul index different
113                         if (isConuslIndexChanged) {
114
115                             updateIndex(consulResponseHeader.getIndex());
116
117                             // get T type data
118                             ConsulResponse<T> consulResponse =
119                                             Http.consulResponse(originalConsulResponse.getResponseType(),
120                                                             originalConsulResponse.getResponse());
121
122                             // notify customer to custom T data
123                             for (Listener<T> l : listeners) {
124                                 l.notify(consulResponse);
125                             }
126                         }
127
128                         if (state.compareAndSet(State.starting, State.started)) {
129                             initLatch.countDown();
130                         }
131
132                         runCallback();
133
134                     } else {
135                         onFailure(new ConsulException("Consul cluster has no elected leader"));
136                     }
137                 } catch (Exception e) {
138                     onFailure(e);
139                 }
140
141             }
142
143             @Override
144             public void onFailure(Throwable throwable) {
145
146                 if (!isRunning()) {
147                     return;
148                 }
149                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s",
150                                 BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS), throwable);
151
152                 executorService.schedule(new Runnable() {
153                     @Override
154                     public void run() {
155                         runCallback();
156                     }
157                 }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS);
158             }
159         };
160     }
161
162     @VisibleForTesting
163     static long getBackOffDelayInMs(Properties properties) {
164         String backOffDelay = null;
165         try {
166             backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY);
167             if (!Strings.isNullOrEmpty(backOffDelay)) {
168                 return Long.parseLong(backOffDelay);
169             }
170         } catch (Exception ex) {
171             LOGGER.warn(backOffDelay != null
172                             ? String.format("Error parsing property variable %s: %s", BACKOFF_DELAY_PROPERTY,
173                                             backOffDelay)
174                             : String.format("Error extracting property variable %s", BACKOFF_DELAY_PROPERTY), ex);
175         }
176         return TimeUnit.SECONDS.toMillis(10);
177     }
178
179     public void start() throws Exception {
180         checkState(state.compareAndSet(State.latent, State.starting), "Cannot transition from state %s to %s",
181                         state.get(), State.starting);
182         runCallback();
183     }
184
185     public void stop() throws Exception {
186         State previous = state.getAndSet(State.stopped);
187         if (previous != State.stopped) {
188             executorService.shutdownNow();
189         }
190     }
191
192     private void runCallback() {
193         if (isRunning()) {
194             callBackConsumer.consume(latestIndex.get(), responseCallback);
195         }
196     }
197
198     private boolean isRunning() {
199         return state.get() == State.started || state.get() == State.starting;
200     }
201
202     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
203         return initLatch.await(timeout, unit);
204     }
205
206     private void updateIndex(ConsulResponse<T> consulResponse) {
207         if (consulResponse != null && consulResponse.getIndex() != null) {
208             this.latestIndex.set(consulResponse.getIndex());
209         }
210     }
211
212     public void updateIndex(BigInteger index) {
213         if (index != null) {
214             this.latestIndex.set(index);
215         }
216     }
217
218     protected static QueryOptions watchParams(final BigInteger index, final int blockSeconds,
219                     QueryOptions queryOptions) {
220         checkArgument(!queryOptions.getIndex().isPresent() && !queryOptions.getWait().isPresent(),
221                         "Index and wait cannot be overridden");
222
223         return ImmutableQueryOptions.builder().from(watchDefaultParams(index, blockSeconds))
224                         .token(queryOptions.getToken()).consistencyMode(queryOptions.getConsistencyMode())
225                         .near(queryOptions.getNear()).build();
226     }
227
228     private static QueryOptions watchDefaultParams(final BigInteger index, final int blockSeconds) {
229         if (index == null) {
230             return QueryOptions.BLANK;
231         } else {
232             return QueryOptions.blockSeconds(blockSeconds, index).build();
233         }
234     }
235
236     /**
237      * passed in by creators to vary the content of the cached values
238      * 
239      * @param <V>
240      */
241     protected interface CallbackConsumer<T> {
242         void consume(BigInteger index, ConsulResponseCallback<T> callback);
243     }
244
245     /**
246      * Implementers can register a listener to receive a new map when it changes
247      * 
248      * @param <V>
249      */
250     public interface Listener<T> {
251         void notify(ConsulResponse<T> newValues);
252     }
253
254     public boolean addListener(Listener<T> listener) {
255         boolean added = listeners.add(listener);
256         return added;
257     }
258
259     public List<Listener<T>> getListeners() {
260         return Collections.unmodifiableList(listeners);
261     }
262
263     public boolean removeListener(Listener<T> listener) {
264         return listeners.remove(listener);
265     }
266
267     @VisibleForTesting
268     protected State getState() {
269         return state.get();
270     }
271
272     private boolean isConuslIndexChanged(final BigInteger index) {
273
274         if (index != null && !index.equals(latestIndex.get())) {
275
276             if (LOGGER.isDebugEnabled()) {
277                 // 第一次不打印
278                 if (latestIndex.get() != null) {
279                     LOGGER.debug("consul index compare:new-" + index + "  old-" + latestIndex.get());
280                 }
281
282             }
283
284             return true;
285         }
286
287         return false;
288     }
289 }