Issue-id: OCS-9
[msb/apigateway.git] / msb-core / apiroute / apiroute-service / src / main / java / org / openo / msb / wrapper / consul / cache / ConsulCache4Map.java
1 /**
2  * Copyright 2016 2015-2016 ZTE, Inc. and others. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.openo.msb.wrapper.consul.cache;
17
18
19 import static com.google.common.base.Preconditions.checkState;
20
21 import java.math.BigInteger;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
35 import org.openo.msb.wrapper.consul.model.ConsulResponse;
36 import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
37 import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;
38 import org.openo.msb.wrapper.consul.option.QueryOptions;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.common.collect.ImmutableList;
44
45 /**
46  * A cache structure that can provide an up-to-date read-only
47  * map backed by consul data
48  *
49  * @param <V>
50  */
51 public class ConsulCache4Map<K, V> {
52
53     enum State {latent, starting, started, stopped }
54
55     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache4Map.class);
56
57     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
58     private final AtomicReference<ImmutableList<ServiceInfo>> lastResponse = new AtomicReference<ImmutableList<ServiceInfo>>(ImmutableList.<ServiceInfo>of());
59     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
60     private final CountDownLatch initLatch = new CountDownLatch(1);
61     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
62     private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
63
64     private final CallbackConsumer<V> callBackConsumer;
65     private final ConsulResponseCallback<Map<String,List<String>>> responseCallback;
66
67     ConsulCache4Map(CallbackConsumer<V> callbackConsumer) {
68         this( callbackConsumer, 10, TimeUnit.SECONDS);
69     }
70
71     ConsulCache4Map(
72             CallbackConsumer<V> callbackConsumer,
73             final long backoffDelayQty,
74             final TimeUnit backoffDelayUnit) {
75
76         this.callBackConsumer = callbackConsumer;
77
78         this.responseCallback = new ConsulResponseCallback<Map<String,List<String>>>() {
79             @Override
80             public void onComplete(ConsulResponse<Map<String,List<String>>> consulResponse) {
81
82                 if (!isRunning()) {
83                     return;
84                 }
85                 updateIndex(consulResponse);
86                 ImmutableList<ServiceInfo> full = convertToList(consulResponse);
87                 List<ServiceInfo> oldList=lastResponse.get();
88                 boolean changed = !full.equals(lastResponse.get());
89 //                LOGGER.info("service changed:"+changed+"----"+full);
90                 if (changed) {
91                     // changes
92                     lastResponse.set(full);
93                 }
94
95                 if (changed) {
96                     for (Listener<K, V> l : listeners) {
97                         l.notify(oldList,full);
98                     }
99                 }
100
101                 if (state.compareAndSet(State.starting, State.started)) {
102                     initLatch.countDown();
103                 }
104                 runCallback();
105             }
106
107             @Override
108             public void onFailure(Throwable throwable) {
109
110                 if (!isRunning()) {
111                     return;
112                 }
113                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
114
115                 executorService.schedule(new Runnable() {
116                     @Override
117                     public void run() {
118                         runCallback();
119                     }
120                 }, backoffDelayQty, backoffDelayUnit);
121             }
122         };
123     }
124
125     public void start() throws Exception {
126         checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
127         runCallback();
128     }
129
130     public void stop() throws Exception {
131         State previous = state.getAndSet(State.stopped);
132         if (previous != State.stopped) {
133             executorService.shutdownNow();
134         }
135     }
136
137     private void runCallback() {
138         if (isRunning()) {
139             callBackConsumer.consume(latestIndex.get(), responseCallback);
140         }
141     }
142
143     private boolean isRunning() {
144         return state.get() == State.started || state.get() == State.starting;
145     }
146
147     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
148         return initLatch.await(timeout, unit);
149     }
150
151     public ImmutableList<ServiceInfo> getMap() {
152         return lastResponse.get();
153     }
154
155     @VisibleForTesting
156     ImmutableList<ServiceInfo> convertToList(final ConsulResponse<Map<String,List<String>>> response) {
157         if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
158             return ImmutableList.of();
159         }
160
161         final ImmutableList.Builder<ServiceInfo> builder = ImmutableList.builder();
162         final Set<String> keySet = new HashSet<>();
163         
164         for(Map.Entry<String,List<String>> entry : response.getResponse().entrySet()) {
165  
166             String key = entry.getKey();  
167            
168             if (key != null && !"consul".equals(key)) {
169                 if (!keySet.contains(key)) {
170                     ServiceInfo serviceInfo=new ServiceInfo();
171                     serviceInfo.setServiceName(key);
172                     
173                     List<String> value=entry.getValue();
174                     for(String tag:value){
175                        
176                         if(tag.startsWith("version")){
177                              String version; 
178                             if(tag.split(":").length==2)
179                             {
180                             version = tag.split(":")[1];
181                             }
182                             else{
183                                 version=""; 
184                             }
185                             
186                             serviceInfo.setVersion(version);
187                             break;
188                         }
189                     }
190                     
191                     builder.add(serviceInfo);
192                 } else {
193                     System.out.println(key.toString());
194                     LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
195                 }
196             }
197             keySet.add(key);
198          
199         }  
200         
201         
202         return builder.build();
203     }
204
205     private void updateIndex(ConsulResponse<Map<String,List<String>>> consulResponse) {
206         if (consulResponse != null && consulResponse.getIndex() != null) {
207             this.latestIndex.set(consulResponse.getIndex());
208         }
209     }
210
211     protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
212         if (index == null) {
213             return QueryOptions.BLANK;
214         } else {
215             return QueryOptions.blockSeconds(blockSeconds, index).build();
216         }
217     }
218
219     /**
220      * passed in by creators to vary the content of the cached values
221      *
222      * @param <V>
223      */
224     protected interface CallbackConsumer<V> {
225         void consume(BigInteger index, ConsulResponseCallback<Map<String,List<String>>> callback);
226     }
227
228     /**
229      * Implementers can register a listener to receive
230      * a new map when it changes
231      *
232      * @param <V>
233      */
234     public interface Listener<K, V> {
235         void notify(List<ServiceInfo> oldValues,List<ServiceInfo> newValues);
236     }
237
238     public boolean addListener(Listener<K, V> listener) {
239         boolean added = listeners.add(listener);
240         if (state.get() == State.started) {
241             listener.notify(lastResponse.get(),lastResponse.get());
242         }
243         return added;
244     }
245
246     public boolean removeListener(Listener<K, V> listener) {
247         return listeners.remove(listener);
248     }
249
250     @VisibleForTesting
251     protected State getState() {
252         return state.get();
253     }
254     
255  
256     
257 }