2 * Copyright 2016 ZTE, Inc. and others.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.msb.sdclient.wrapper.consul.cache;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Function;
20 import com.google.common.collect.ImmutableMap;
22 import org.onap.msb.sdclient.wrapper.consul.async.ConsulResponseCallback;
23 import org.onap.msb.sdclient.wrapper.consul.model.ConsulResponse;
24 import org.onap.msb.sdclient.wrapper.consul.option.QueryOptions;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 import java.math.BigInteger;
29 import java.util.HashSet;
30 import java.util.List;
33 import java.util.concurrent.CopyOnWriteArrayList;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
40 import static com.google.common.base.Preconditions.checkState;
43 * A cache structure that can provide an up-to-date read-only
44 * map backed by consul data
48 public class ConsulCache<K, V> {
50 enum State {latent, starting, started, stopped }
52 private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
54 private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
55 private final AtomicReference<ImmutableMap<K, V>> lastResponse = new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of());
56 private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
57 private final CountDownLatch initLatch = new CountDownLatch(1);
58 private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
59 private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
61 private final Function<V, K> keyConversion;
62 private final CallbackConsumer<V> callBackConsumer;
63 private final ConsulResponseCallback<List<V>> responseCallback;
66 Function<V, K> keyConversion,
67 CallbackConsumer<V> callbackConsumer) {
68 this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS);
72 Function<V, K> keyConversion,
73 CallbackConsumer<V> callbackConsumer,
74 final long backoffDelayQty,
75 final TimeUnit backoffDelayUnit) {
77 this.keyConversion = keyConversion;
78 this.callBackConsumer = callbackConsumer;
80 this.responseCallback = new ConsulResponseCallback<List<V>>() {
82 public void onComplete(ConsulResponse<List<V>> consulResponse) {
87 updateIndex(consulResponse);
88 ImmutableMap<K, V> full = convertToMap(consulResponse);
90 boolean changed = !full.equals(lastResponse.get());
91 // LOGGER.info("node changed:"+changed+"----"+full);
94 lastResponse.set(full);
98 for (Listener<K, V> l : listeners) {
103 if (state.compareAndSet(State.starting, State.started)) {
104 initLatch.countDown();
110 public void onFailure(Throwable throwable) {
115 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
117 executorService.schedule(new Runnable() {
122 }, backoffDelayQty, backoffDelayUnit);
127 public void start() throws Exception {
128 checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
132 public void stop() throws Exception {
133 State previous = state.getAndSet(State.stopped);
134 if (previous != State.stopped) {
135 executorService.shutdownNow();
139 private void runCallback() {
141 callBackConsumer.consume(latestIndex.get(), responseCallback);
145 private boolean isRunning() {
146 return state.get() == State.started || state.get() == State.starting;
149 public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
150 return initLatch.await(timeout, unit);
153 public ImmutableMap<K, V> getMap() {
154 return lastResponse.get();
158 ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {
159 if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
160 return ImmutableMap.of();
163 final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder();
164 final Set<K> keySet = new HashSet<>();
165 for (final V v : response.getResponse()) {
166 final K key = keyConversion.apply(v);
168 if (!keySet.contains(key)) {
171 System.out.println(key.toString());
172 LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
177 return builder.build();
180 private void updateIndex(ConsulResponse<List<V>> consulResponse) {
181 if (consulResponse != null && consulResponse.getIndex() != null) {
182 this.latestIndex.set(consulResponse.getIndex());
186 protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
188 return QueryOptions.BLANK;
190 return QueryOptions.blockSeconds(blockSeconds, index).build();
195 * passed in by creators to vary the content of the cached values
199 protected interface CallbackConsumer<V> {
200 void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
204 * Implementers can register a listener to receive
205 * a new map when it changes
209 public interface Listener<K, V> {
210 void notify(Map<K, V> newValues);
213 public boolean addListener(Listener<K, V> listener) {
214 boolean added = listeners.add(listener);
215 if (state.get() == State.started) {
216 listener.notify(lastResponse.get());
221 public boolean removeListener(Listener<K, V> listener) {
222 return listeners.remove(listener);
226 protected State getState() {