1 /*******************************************************************************
2 * Copyright 2016-2017 ZTE, Inc. and others.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 ******************************************************************************/
16 package org.onap.msb.apiroute.wrapper.consulextend.cache;
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
21 import java.math.BigInteger;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
32 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback;
33 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader;
34 import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse;
35 import org.onap.msb.apiroute.wrapper.consulextend.util.Http;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.google.common.annotations.VisibleForTesting;
40 import com.google.common.base.Strings;
41 import com.orbitz.consul.ConsulException;
42 import com.orbitz.consul.model.ConsulResponse;
43 import com.orbitz.consul.option.ImmutableQueryOptions;
44 import com.orbitz.consul.option.QueryOptions;
47 * A cache structure that can provide an up-to-date read-only map backed by
52 public class ConsulCache<T> {
55 latent, starting, started, stopped
58 private final static Logger LOGGER = LoggerFactory
59 .getLogger(ConsulCache.class);
62 static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay";
63 private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System
66 private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(
68 private final AtomicReference<State> state = new AtomicReference<State>(
70 private final CountDownLatch initLatch = new CountDownLatch(1);
71 private final ScheduledExecutorService executorService = Executors
72 .newSingleThreadScheduledExecutor();
73 private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>();
75 private final CallbackConsumer<T> callBackConsumer;
76 private final ConsulResponseCallback<T> responseCallback;
78 ConsulCache(CallbackConsumer<T> callbackConsumer) {
80 this.callBackConsumer = callbackConsumer;
82 this.responseCallback = new ConsulResponseCallback<T>() {
84 public void onComplete(ConsulResponse<T> consulResponse) {
86 if (consulResponse.isKnownLeader()) {
90 updateIndex(consulResponse);
92 for (Listener<T> l : listeners) {
93 l.notify(consulResponse);
96 if (state.compareAndSet(State.starting, State.started)) {
97 initLatch.countDown();
102 onFailure(new ConsulException(
103 "Consul cluster has no elected leader"));
108 public void onDelayComplete(
109 OriginalConsulResponse<T> originalConsulResponse) {
113 ConsulResponseHeader consulResponseHeader = Http
114 .consulResponseHeader(originalConsulResponse
117 if (consulResponseHeader.isKnownLeader()) {
122 boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader
124 // consul index different
125 if (isConuslIndexChanged) {
127 updateIndex(consulResponseHeader.getIndex());
130 ConsulResponse<T> consulResponse = Http
131 .consulResponse(originalConsulResponse
133 originalConsulResponse
136 // notify customer to custom T data
137 for (Listener<T> l : listeners) {
138 l.notify(consulResponse);
142 if (state.compareAndSet(State.starting, State.started)) {
143 initLatch.countDown();
149 onFailure(new ConsulException(
150 "Consul cluster has no elected leader"));
152 } catch (Exception e) {
159 public void onFailure(Throwable throwable) {
166 "Error getting response from consul. will retry in %d %s",
167 BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS),
170 executorService.schedule(new Runnable() {
175 }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS);
181 static long getBackOffDelayInMs(Properties properties) {
182 String backOffDelay = null;
184 backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY);
185 if (!Strings.isNullOrEmpty(backOffDelay)) {
186 return Long.parseLong(backOffDelay);
188 } catch (Exception ex) {
190 backOffDelay != null ? String.format(
191 "Error parsing property variable %s: %s",
192 BACKOFF_DELAY_PROPERTY, backOffDelay) : String
193 .format("Error extracting property variable %s",
194 BACKOFF_DELAY_PROPERTY), ex);
196 return TimeUnit.SECONDS.toMillis(10);
199 public void start() throws Exception {
200 checkState(state.compareAndSet(State.latent, State.starting),
201 "Cannot transition from state %s to %s", state.get(),
206 public void stop() throws Exception {
207 State previous = state.getAndSet(State.stopped);
208 if (previous != State.stopped) {
209 executorService.shutdownNow();
213 private void runCallback() {
215 callBackConsumer.consume(latestIndex.get(), responseCallback);
219 private boolean isRunning() {
220 return state.get() == State.started || state.get() == State.starting;
223 public boolean awaitInitialized(long timeout, TimeUnit unit)
224 throws InterruptedException {
225 return initLatch.await(timeout, unit);
228 private void updateIndex(ConsulResponse<T> consulResponse) {
229 if (consulResponse != null && consulResponse.getIndex() != null) {
230 this.latestIndex.set(consulResponse.getIndex());
234 public void updateIndex(BigInteger index) {
236 this.latestIndex.set(index);
240 protected static QueryOptions watchParams(final BigInteger index,
241 final int blockSeconds, QueryOptions queryOptions) {
242 checkArgument(!queryOptions.getIndex().isPresent()
243 && !queryOptions.getWait().isPresent(),
244 "Index and wait cannot be overridden");
246 return ImmutableQueryOptions.builder()
247 .from(watchDefaultParams(index, blockSeconds))
248 .token(queryOptions.getToken())
249 .consistencyMode(queryOptions.getConsistencyMode())
250 .near(queryOptions.getNear()).build();
253 private static QueryOptions watchDefaultParams(final BigInteger index,
254 final int blockSeconds) {
256 return QueryOptions.BLANK;
258 return QueryOptions.blockSeconds(blockSeconds, index).build();
263 * passed in by creators to vary the content of the cached values
267 protected interface CallbackConsumer<T> {
268 void consume(BigInteger index, ConsulResponseCallback<T> callback);
272 * Implementers can register a listener to receive a new map when it changes
276 public interface Listener<T> {
277 void notify(ConsulResponse<T> newValues);
280 public boolean addListener(Listener<T> listener) {
281 boolean added = listeners.add(listener);
285 public List<Listener<T>> getListeners() {
286 return Collections.unmodifiableList(listeners);
289 public boolean removeListener(Listener<T> listener) {
290 return listeners.remove(listener);
294 protected State getState() {
298 private boolean isConuslIndexChanged(final BigInteger index) {
300 if (index != null && !index.equals(latestIndex.get())) {
302 if (LOGGER.isDebugEnabled()) {
304 if (latestIndex.get() != null) {
305 LOGGER.debug("consul index compare:new-" + index + " old-"
306 + latestIndex.get());