1 /*******************************************************************************
2 * Copyright 2016-2017 ZTE, Inc. and others.
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
13 ******************************************************************************/
14 package org.onap.msb.apiroute.wrapper.consulextend.cache;
16 import static com.google.common.base.Preconditions.checkArgument;
17 import static com.google.common.base.Preconditions.checkState;
19 import java.math.BigInteger;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Properties;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
30 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback;
31 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader;
32 import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse;
33 import org.onap.msb.apiroute.wrapper.consulextend.util.Http;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import com.google.common.annotations.VisibleForTesting;
38 import com.google.common.base.Strings;
39 import com.orbitz.consul.ConsulException;
40 import com.orbitz.consul.model.ConsulResponse;
41 import com.orbitz.consul.option.ImmutableQueryOptions;
42 import com.orbitz.consul.option.QueryOptions;
45 * A cache structure that can provide an up-to-date read-only map backed by consul data
49 public class ConsulCache<T> {
52 latent, starting, started, stopped
55 private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
58 static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay";
59 private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System.getProperties());
61 private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
62 private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
63 private final CountDownLatch initLatch = new CountDownLatch(1);
64 private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
65 private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>();
67 private final CallbackConsumer<T> callBackConsumer;
68 private final ConsulResponseCallback<T> responseCallback;
70 ConsulCache(CallbackConsumer<T> callbackConsumer) {
72 this.callBackConsumer = callbackConsumer;
74 this.responseCallback = new ConsulResponseCallback<T>() {
76 public void onComplete(ConsulResponse<T> consulResponse) {
78 if (consulResponse.isKnownLeader()) {
82 updateIndex(consulResponse);
84 for (Listener<T> l : listeners) {
85 l.notify(consulResponse);
88 if (state.compareAndSet(State.starting, State.started)) {
89 initLatch.countDown();
94 onFailure(new ConsulException("Consul cluster has no elected leader"));
99 public void onDelayComplete(OriginalConsulResponse<T> originalConsulResponse) {
103 ConsulResponseHeader consulResponseHeader =
104 Http.consulResponseHeader(originalConsulResponse.getResponse());
106 if (consulResponseHeader.isKnownLeader()) {
111 boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader.getIndex());
112 // consul index different
113 if (isConuslIndexChanged) {
115 updateIndex(consulResponseHeader.getIndex());
118 ConsulResponse<T> consulResponse =
119 Http.consulResponse(originalConsulResponse.getResponseType(),
120 originalConsulResponse.getResponse());
122 // notify customer to custom T data
123 for (Listener<T> l : listeners) {
124 l.notify(consulResponse);
128 if (state.compareAndSet(State.starting, State.started)) {
129 initLatch.countDown();
135 onFailure(new ConsulException("Consul cluster has no elected leader"));
137 } catch (Exception e) {
144 public void onFailure(Throwable throwable) {
149 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s",
150 BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS), throwable);
152 executorService.schedule(new Runnable() {
157 }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS);
163 static long getBackOffDelayInMs(Properties properties) {
164 String backOffDelay = null;
166 backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY);
167 if (!Strings.isNullOrEmpty(backOffDelay)) {
168 return Long.parseLong(backOffDelay);
170 } catch (Exception ex) {
171 LOGGER.warn(backOffDelay != null
172 ? String.format("Error parsing property variable %s: %s", BACKOFF_DELAY_PROPERTY,
174 : String.format("Error extracting property variable %s", BACKOFF_DELAY_PROPERTY), ex);
176 return TimeUnit.SECONDS.toMillis(10);
179 public void start() throws Exception {
180 checkState(state.compareAndSet(State.latent, State.starting), "Cannot transition from state %s to %s",
181 state.get(), State.starting);
185 public void stop() throws Exception {
186 State previous = state.getAndSet(State.stopped);
187 if (previous != State.stopped) {
188 executorService.shutdownNow();
192 private void runCallback() {
194 callBackConsumer.consume(latestIndex.get(), responseCallback);
198 private boolean isRunning() {
199 return state.get() == State.started || state.get() == State.starting;
202 public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
203 return initLatch.await(timeout, unit);
206 private void updateIndex(ConsulResponse<T> consulResponse) {
207 if (consulResponse != null && consulResponse.getIndex() != null) {
208 this.latestIndex.set(consulResponse.getIndex());
212 public void updateIndex(BigInteger index) {
214 this.latestIndex.set(index);
218 protected static QueryOptions watchParams(final BigInteger index, final int blockSeconds,
219 QueryOptions queryOptions) {
220 checkArgument(!queryOptions.getIndex().isPresent() && !queryOptions.getWait().isPresent(),
221 "Index and wait cannot be overridden");
223 return ImmutableQueryOptions.builder().from(watchDefaultParams(index, blockSeconds))
224 .token(queryOptions.getToken()).consistencyMode(queryOptions.getConsistencyMode())
225 .near(queryOptions.getNear()).build();
228 private static QueryOptions watchDefaultParams(final BigInteger index, final int blockSeconds) {
230 return QueryOptions.BLANK;
232 return QueryOptions.blockSeconds(blockSeconds, index).build();
237 * passed in by creators to vary the content of the cached values
241 protected interface CallbackConsumer<T> {
242 void consume(BigInteger index, ConsulResponseCallback<T> callback);
246 * Implementers can register a listener to receive a new map when it changes
250 public interface Listener<T> {
251 void notify(ConsulResponse<T> newValues);
254 public boolean addListener(Listener<T> listener) {
255 boolean added = listeners.add(listener);
259 public List<Listener<T>> getListeners() {
260 return Collections.unmodifiableList(listeners);
263 public boolean removeListener(Listener<T> listener) {
264 return listeners.remove(listener);
268 protected State getState() {
272 private boolean isConuslIndexChanged(final BigInteger index) {
274 if (index != null && !index.equals(latestIndex.get())) {
276 if (LOGGER.isDebugEnabled()) {
278 if (latestIndex.get() != null) {
279 LOGGER.debug("consul index compare:new-" + index + " old-" + latestIndex.get());