2 * Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
17 package org.openo.msb.wrapper.consul.cache;
\r
20 import static com.google.common.base.Preconditions.checkState;
\r
22 import java.math.BigInteger;
\r
23 import java.util.HashSet;
\r
24 import java.util.Iterator;
\r
25 import java.util.List;
\r
26 import java.util.Map;
\r
27 import java.util.Set;
\r
28 import java.util.concurrent.CopyOnWriteArrayList;
\r
29 import java.util.concurrent.CountDownLatch;
\r
30 import java.util.concurrent.Executors;
\r
31 import java.util.concurrent.ScheduledExecutorService;
\r
32 import java.util.concurrent.TimeUnit;
\r
33 import java.util.concurrent.atomic.AtomicReference;
\r
35 import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
\r
36 import org.openo.msb.wrapper.consul.model.ConsulResponse;
\r
37 import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
\r
38 import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;
\r
39 import org.openo.msb.wrapper.consul.option.QueryOptions;
\r
40 import org.slf4j.Logger;
\r
41 import org.slf4j.LoggerFactory;
\r
43 import com.google.common.annotations.VisibleForTesting;
\r
44 import com.google.common.collect.ImmutableList;
\r
47 * A cache structure that can provide an up-to-date read-only
\r
48 * map backed by consul data
\r
52 public class ConsulCache4Map<K, V> {
\r
54 enum State {latent, starting, started, stopped }
\r
56 private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache4Map.class);
\r
58 private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
\r
59 private final AtomicReference<ImmutableList<ServiceInfo>> lastResponse = new AtomicReference<ImmutableList<ServiceInfo>>(ImmutableList.<ServiceInfo>of());
\r
60 private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
\r
61 private final CountDownLatch initLatch = new CountDownLatch(1);
\r
62 private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
\r
63 private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
\r
65 private final CallbackConsumer<V> callBackConsumer;
\r
66 private final ConsulResponseCallback<Map<String,List<String>>> responseCallback;
\r
68 ConsulCache4Map(CallbackConsumer<V> callbackConsumer) {
\r
69 this( callbackConsumer, 10, TimeUnit.SECONDS);
\r
73 CallbackConsumer<V> callbackConsumer,
\r
74 final long backoffDelayQty,
\r
75 final TimeUnit backoffDelayUnit) {
\r
77 this.callBackConsumer = callbackConsumer;
\r
79 this.responseCallback = new ConsulResponseCallback<Map<String,List<String>>>() {
\r
81 public void onComplete(ConsulResponse<Map<String,List<String>>> consulResponse) {
\r
86 updateIndex(consulResponse);
\r
87 ImmutableList<ServiceInfo> full = convertToList(consulResponse);
\r
88 List<ServiceInfo> oldList=lastResponse.get();
\r
89 boolean changed = !full.equals(lastResponse.get());
\r
90 // LOGGER.info("service changed:"+changed+"----"+full);
\r
93 lastResponse.set(full);
\r
97 for (Listener<K, V> l : listeners) {
\r
98 l.notify(oldList,full);
\r
102 if (state.compareAndSet(State.starting, State.started)) {
\r
103 initLatch.countDown();
\r
109 public void onFailure(Throwable throwable) {
\r
111 if (!isRunning()) {
\r
114 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
\r
116 executorService.schedule(new Runnable() {
\r
118 public void run() {
\r
121 }, backoffDelayQty, backoffDelayUnit);
\r
126 public void start() throws Exception {
\r
127 checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
\r
131 public void stop() throws Exception {
\r
132 State previous = state.getAndSet(State.stopped);
\r
133 if (previous != State.stopped) {
\r
134 executorService.shutdownNow();
\r
138 private void runCallback() {
\r
140 callBackConsumer.consume(latestIndex.get(), responseCallback);
\r
144 private boolean isRunning() {
\r
145 return state.get() == State.started || state.get() == State.starting;
\r
148 public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
\r
149 return initLatch.await(timeout, unit);
\r
152 public ImmutableList<ServiceInfo> getMap() {
\r
153 return lastResponse.get();
\r
157 ImmutableList<ServiceInfo> convertToList(final ConsulResponse<Map<String,List<String>>> response) {
\r
158 if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
\r
159 return ImmutableList.of();
\r
162 final ImmutableList.Builder<ServiceInfo> builder = ImmutableList.builder();
\r
163 final Set<String> keySet = new HashSet<>();
\r
165 for(Map.Entry<String,List<String>> entry : response.getResponse().entrySet()) {
\r
167 String key = entry.getKey();
\r
169 if (key != null && !"consul".equals(key)) {
\r
170 if (!keySet.contains(key)) {
\r
171 ServiceInfo serviceInfo=new ServiceInfo();
\r
172 serviceInfo.setServiceName(key);
\r
174 List<String> value=entry.getValue();
\r
175 for(String tag:value){
\r
177 if(tag.startsWith("version")){
\r
179 if(tag.split(":").length==2)
\r
181 version = tag.split(":")[1];
\r
187 serviceInfo.setVersion(version);
\r
192 builder.add(serviceInfo);
\r
194 System.out.println(key.toString());
\r
195 LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
\r
203 return builder.build();
\r
206 private void updateIndex(ConsulResponse<Map<String,List<String>>> consulResponse) {
\r
207 if (consulResponse != null && consulResponse.getIndex() != null) {
\r
208 this.latestIndex.set(consulResponse.getIndex());
\r
212 protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
\r
213 if (index == null) {
\r
214 return QueryOptions.BLANK;
\r
216 return QueryOptions.blockSeconds(blockSeconds, index).build();
\r
221 * passed in by creators to vary the content of the cached values
\r
225 protected interface CallbackConsumer<V> {
\r
226 void consume(BigInteger index, ConsulResponseCallback<Map<String,List<String>>> callback);
\r
230 * Implementers can register a listener to receive
\r
231 * a new map when it changes
\r
235 public interface Listener<K, V> {
\r
236 void notify(List<ServiceInfo> oldValues,List<ServiceInfo> newValues);
\r
239 public boolean addListener(Listener<K, V> listener) {
\r
240 boolean added = listeners.add(listener);
\r
241 if (state.get() == State.started) {
\r
242 listener.notify(lastResponse.get(),lastResponse.get());
\r
247 public boolean removeListener(Listener<K, V> listener) {
\r
248 return listeners.remove(listener);
\r
252 protected State getState() {
\r
253 return state.get();
\r