5325b8954211edc88d47ac67e1bfe7ee2d0b298e
[msb/apigateway.git] /
1 /*******************************************************************************
2  * Copyright 2016-2017 ZTE, Inc. and others.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * 
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  * 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  ******************************************************************************/
16 package org.onap.msb.apiroute.wrapper.consulextend.cache;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20
21 import java.math.BigInteger;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback;
33 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader;
34 import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse;
35 import org.onap.msb.apiroute.wrapper.consulextend.util.Http;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.annotations.VisibleForTesting;
40 import com.google.common.base.Strings;
41 import com.orbitz.consul.ConsulException;
42 import com.orbitz.consul.model.ConsulResponse;
43 import com.orbitz.consul.option.ImmutableQueryOptions;
44 import com.orbitz.consul.option.QueryOptions;
45
46 /**
47  * A cache structure that can provide an up-to-date read-only map backed by
48  * consul data
49  * 
50  * @param <V>
51  */
52 public class ConsulCache<T> {
53
54         enum State {
55                 latent, starting, started, stopped
56         }
57
58         private final static Logger LOGGER = LoggerFactory
59                         .getLogger(ConsulCache.class);
60
61         @VisibleForTesting
62         static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay";
63         private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System
64                         .getProperties());
65
66         private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(
67                         null);
68         private final AtomicReference<State> state = new AtomicReference<State>(
69                         State.latent);
70         private final CountDownLatch initLatch = new CountDownLatch(1);
71         private final ScheduledExecutorService executorService = Executors
72                         .newSingleThreadScheduledExecutor();
73         private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>();
74
75         private final CallbackConsumer<T> callBackConsumer;
76         private final ConsulResponseCallback<T> responseCallback;
77
78         ConsulCache(CallbackConsumer<T> callbackConsumer) {
79
80                 this.callBackConsumer = callbackConsumer;
81
82                 this.responseCallback = new ConsulResponseCallback<T>() {
83                         @Override
84                         public void onComplete(ConsulResponse<T> consulResponse) {
85
86                                 if (consulResponse.isKnownLeader()) {
87                                         if (!isRunning()) {
88                                                 return;
89                                         }
90                                         updateIndex(consulResponse);
91
92                                         for (Listener<T> l : listeners) {
93                                                 l.notify(consulResponse);
94                                         }
95
96                                         if (state.compareAndSet(State.starting, State.started)) {
97                                                 initLatch.countDown();
98                                         }
99
100                                         runCallback();
101                                 } else {
102                                         onFailure(new ConsulException(
103                                                         "Consul cluster has no elected leader"));
104                                 }
105                         }
106
107                         @Override
108                         public void onDelayComplete(
109                                         OriginalConsulResponse<T> originalConsulResponse) {
110
111                                 try {
112                                         // get header
113                                         ConsulResponseHeader consulResponseHeader = Http
114                                                         .consulResponseHeader(originalConsulResponse
115                                                                         .getResponse());
116
117                                         if (consulResponseHeader.isKnownLeader()) {
118                                                 if (!isRunning()) {
119                                                         return;
120                                                 }
121
122                                                 boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader
123                                                                 .getIndex());
124                                                 // consul index different
125                                                 if (isConuslIndexChanged) {
126
127                                                         updateIndex(consulResponseHeader.getIndex());
128
129                                                         // get T type data
130                                                         ConsulResponse<T> consulResponse = Http
131                                                                         .consulResponse(originalConsulResponse
132                                                                                         .getResponseType(),
133                                                                                         originalConsulResponse
134                                                                                                         .getResponse());
135
136                                                         // notify customer to custom T data
137                                                         for (Listener<T> l : listeners) {
138                                                                 l.notify(consulResponse);
139                                                         }
140                                                 }
141
142                                                 if (state.compareAndSet(State.starting, State.started)) {
143                                                         initLatch.countDown();
144                                                 }
145
146                                                 runCallback();
147
148                                         } else {
149                                                 onFailure(new ConsulException(
150                                                                 "Consul cluster has no elected leader"));
151                                         }
152                                 } catch (Exception e) {
153                                         onFailure(e);
154                                 }
155
156                         }
157
158                         @Override
159                         public void onFailure(Throwable throwable) {
160
161                                 if (!isRunning()) {
162                                         return;
163                                 }
164                                 LOGGER.error(
165                                                 String.format(
166                                                                 "Error getting response from consul. will retry in %d %s",
167                                                                 BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS),
168                                                 throwable);
169
170                                 executorService.schedule(new Runnable() {
171                                         @Override
172                                         public void run() {
173                                                 runCallback();
174                                         }
175                                 }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS);
176                         }
177                 };
178         }
179
180         @VisibleForTesting
181         static long getBackOffDelayInMs(Properties properties) {
182                 String backOffDelay = null;
183                 try {
184                         backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY);
185                         if (!Strings.isNullOrEmpty(backOffDelay)) {
186                                 return Long.parseLong(backOffDelay);
187                         }
188                 } catch (Exception ex) {
189                         LOGGER.warn(
190                                         backOffDelay != null ? String.format(
191                                                         "Error parsing property variable %s: %s",
192                                                         BACKOFF_DELAY_PROPERTY, backOffDelay) : String
193                                                         .format("Error extracting property variable %s",
194                                                                         BACKOFF_DELAY_PROPERTY), ex);
195                 }
196                 return TimeUnit.SECONDS.toMillis(10);
197         }
198
199         public void start() throws Exception {
200                 checkState(state.compareAndSet(State.latent, State.starting),
201                                 "Cannot transition from state %s to %s", state.get(),
202                                 State.starting);
203                 runCallback();
204         }
205
206         public void stop() throws Exception {
207                 State previous = state.getAndSet(State.stopped);
208                 if (previous != State.stopped) {
209                         executorService.shutdownNow();
210                 }
211         }
212
213         private void runCallback() {
214                 if (isRunning()) {
215                         callBackConsumer.consume(latestIndex.get(), responseCallback);
216                 }
217         }
218
219         private boolean isRunning() {
220                 return state.get() == State.started || state.get() == State.starting;
221         }
222
223         public boolean awaitInitialized(long timeout, TimeUnit unit)
224                         throws InterruptedException {
225                 return initLatch.await(timeout, unit);
226         }
227
228         private void updateIndex(ConsulResponse<T> consulResponse) {
229                 if (consulResponse != null && consulResponse.getIndex() != null) {
230                         this.latestIndex.set(consulResponse.getIndex());
231                 }
232         }
233
234         public void updateIndex(BigInteger index) {
235                 if (index != null) {
236                         this.latestIndex.set(index);
237                 }
238         }
239
240         protected static QueryOptions watchParams(final BigInteger index,
241                         final int blockSeconds, QueryOptions queryOptions) {
242                 checkArgument(!queryOptions.getIndex().isPresent()
243                                 && !queryOptions.getWait().isPresent(),
244                                 "Index and wait cannot be overridden");
245
246                 return ImmutableQueryOptions.builder()
247                                 .from(watchDefaultParams(index, blockSeconds))
248                                 .token(queryOptions.getToken())
249                                 .consistencyMode(queryOptions.getConsistencyMode())
250                                 .near(queryOptions.getNear()).build();
251         }
252
253         private static QueryOptions watchDefaultParams(final BigInteger index,
254                         final int blockSeconds) {
255                 if (index == null) {
256                         return QueryOptions.BLANK;
257                 } else {
258                         return QueryOptions.blockSeconds(blockSeconds, index).build();
259                 }
260         }
261
262         /**
263          * passed in by creators to vary the content of the cached values
264          * 
265          * @param <V>
266          */
267         protected interface CallbackConsumer<T> {
268                 void consume(BigInteger index, ConsulResponseCallback<T> callback);
269         }
270
271         /**
272          * Implementers can register a listener to receive a new map when it changes
273          * 
274          * @param <V>
275          */
276         public interface Listener<T> {
277                 void notify(ConsulResponse<T> newValues);
278         }
279
280         public boolean addListener(Listener<T> listener) {
281                 boolean added = listeners.add(listener);
282                 return added;
283         }
284
285         public List<Listener<T>> getListeners() {
286                 return Collections.unmodifiableList(listeners);
287         }
288
289         public boolean removeListener(Listener<T> listener) {
290                 return listeners.remove(listener);
291         }
292
293         @VisibleForTesting
294         protected State getState() {
295                 return state.get();
296         }
297
298         private boolean isConuslIndexChanged(final BigInteger index) {
299
300                 if (index != null && !index.equals(latestIndex.get())) {
301
302                         if (LOGGER.isDebugEnabled()) {
303                                 // 第一次不打印
304                                 if (latestIndex.get() != null) {
305                                         LOGGER.debug("consul index compare:new-" + index + "  old-"
306                                                         + latestIndex.get());
307                                 }
308
309                         }
310
311                         return true;
312                 }
313
314                 return false;
315         }
316 }