1 package org.onap.msb.apiroute.wrapper.consulextend.cache;
3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState;
6 import java.math.BigInteger;
7 import java.util.Collections;
9 import java.util.Properties;
10 import java.util.concurrent.CopyOnWriteArrayList;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicReference;
17 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback;
18 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader;
19 import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse;
20 import org.onap.msb.apiroute.wrapper.consulextend.util.Http;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.google.common.annotations.VisibleForTesting;
25 import com.google.common.base.Strings;
26 import com.orbitz.consul.ConsulException;
27 import com.orbitz.consul.model.ConsulResponse;
28 import com.orbitz.consul.option.ImmutableQueryOptions;
29 import com.orbitz.consul.option.QueryOptions;
32 * A cache structure that can provide an up-to-date read-only map backed by
37 public class ConsulCache<T> {
40 latent, starting, started, stopped
43 private final static Logger LOGGER = LoggerFactory
44 .getLogger(ConsulCache.class);
47 static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay";
48 private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System
51 private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(
53 private final AtomicReference<State> state = new AtomicReference<State>(
55 private final CountDownLatch initLatch = new CountDownLatch(1);
56 private final ScheduledExecutorService executorService = Executors
57 .newSingleThreadScheduledExecutor();
58 private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>();
60 private final CallbackConsumer<T> callBackConsumer;
61 private final ConsulResponseCallback<T> responseCallback;
63 ConsulCache(CallbackConsumer<T> callbackConsumer) {
65 this.callBackConsumer = callbackConsumer;
67 this.responseCallback = new ConsulResponseCallback<T>() {
69 public void onComplete(ConsulResponse<T> consulResponse) {
71 if (consulResponse.isKnownLeader()) {
75 updateIndex(consulResponse);
77 for (Listener<T> l : listeners) {
78 l.notify(consulResponse);
81 if (state.compareAndSet(State.starting, State.started)) {
82 initLatch.countDown();
87 onFailure(new ConsulException(
88 "Consul cluster has no elected leader"));
93 public void onDelayComplete(
94 OriginalConsulResponse<T> originalConsulResponse) {
98 ConsulResponseHeader consulResponseHeader = Http
99 .consulResponseHeader(originalConsulResponse
102 if (consulResponseHeader.isKnownLeader()) {
107 boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader
109 // consul index different
110 if (isConuslIndexChanged) {
112 updateIndex(consulResponseHeader.getIndex());
115 ConsulResponse<T> consulResponse = Http
116 .consulResponse(originalConsulResponse
118 originalConsulResponse
121 // notify customer to custom T data
122 for (Listener<T> l : listeners) {
123 l.notify(consulResponse);
127 if (state.compareAndSet(State.starting, State.started)) {
128 initLatch.countDown();
134 onFailure(new ConsulException(
135 "Consul cluster has no elected leader"));
137 } catch (Exception e) {
144 public void onFailure(Throwable throwable) {
151 "Error getting response from consul. will retry in %d %s",
152 BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS),
155 executorService.schedule(new Runnable() {
160 }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS);
166 static long getBackOffDelayInMs(Properties properties) {
167 String backOffDelay = null;
169 backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY);
170 if (!Strings.isNullOrEmpty(backOffDelay)) {
171 return Long.parseLong(backOffDelay);
173 } catch (Exception ex) {
175 backOffDelay != null ? String.format(
176 "Error parsing property variable %s: %s",
177 BACKOFF_DELAY_PROPERTY, backOffDelay) : String
178 .format("Error extracting property variable %s",
179 BACKOFF_DELAY_PROPERTY), ex);
181 return TimeUnit.SECONDS.toMillis(10);
184 public void start() throws Exception {
185 checkState(state.compareAndSet(State.latent, State.starting),
186 "Cannot transition from state %s to %s", state.get(),
191 public void stop() throws Exception {
192 State previous = state.getAndSet(State.stopped);
193 if (previous != State.stopped) {
194 executorService.shutdownNow();
198 private void runCallback() {
200 callBackConsumer.consume(latestIndex.get(), responseCallback);
204 private boolean isRunning() {
205 return state.get() == State.started || state.get() == State.starting;
208 public boolean awaitInitialized(long timeout, TimeUnit unit)
209 throws InterruptedException {
210 return initLatch.await(timeout, unit);
213 private void updateIndex(ConsulResponse<T> consulResponse) {
214 if (consulResponse != null && consulResponse.getIndex() != null) {
215 this.latestIndex.set(consulResponse.getIndex());
219 public void updateIndex(BigInteger index) {
221 this.latestIndex.set(index);
225 protected static QueryOptions watchParams(final BigInteger index,
226 final int blockSeconds, QueryOptions queryOptions) {
227 checkArgument(!queryOptions.getIndex().isPresent()
228 && !queryOptions.getWait().isPresent(),
229 "Index and wait cannot be overridden");
231 return ImmutableQueryOptions.builder()
232 .from(watchDefaultParams(index, blockSeconds))
233 .token(queryOptions.getToken())
234 .consistencyMode(queryOptions.getConsistencyMode())
235 .near(queryOptions.getNear()).build();
238 private static QueryOptions watchDefaultParams(final BigInteger index,
239 final int blockSeconds) {
241 return QueryOptions.BLANK;
243 return QueryOptions.blockSeconds(blockSeconds, index).build();
248 * passed in by creators to vary the content of the cached values
252 protected interface CallbackConsumer<T> {
253 void consume(BigInteger index, ConsulResponseCallback<T> callback);
257 * Implementers can register a listener to receive a new map when it changes
261 public interface Listener<T> {
262 void notify(ConsulResponse<T> newValues);
265 public boolean addListener(Listener<T> listener) {
266 boolean added = listeners.add(listener);
270 public List<Listener<T>> getListeners() {
271 return Collections.unmodifiableList(listeners);
274 public boolean removeListener(Listener<T> listener) {
275 return listeners.remove(listener);
279 protected State getState() {
283 private boolean isConuslIndexChanged(final BigInteger index) {
285 if (index != null && !index.equals(latestIndex.get())) {
287 if (LOGGER.isDebugEnabled()) {
289 if (latestIndex.get() != null) {
290 LOGGER.debug("consul index compare:new-" + index + " old-"
291 + latestIndex.get());