Initial code import
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / openo / msb / wrapper / consul / cache / ConsulCache4Map.java
1 /**\r
2 * Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)\r
3 *\r
4 * Licensed under the Apache License, Version 2.0 (the "License");\r
5 * you may not use this file except in compliance with the License.\r
6 * You may obtain a copy of the License at\r
7 *\r
8 * http://www.apache.org/licenses/LICENSE-2.0\r
9 *\r
10 * Unless required by applicable law or agreed to in writing, software\r
11 * distributed under the License is distributed on an "AS IS" BASIS,\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13 * See the License for the specific language governing permissions and\r
14 * limitations under the License.\r
15 */\r
16 \r
17 package org.openo.msb.wrapper.consul.cache;\r
18 \r
19 \r
20 import static com.google.common.base.Preconditions.checkState;\r
21 \r
22 import java.math.BigInteger;\r
23 import java.util.HashSet;\r
24 import java.util.Iterator;\r
25 import java.util.List;\r
26 import java.util.Map;\r
27 import java.util.Set;\r
28 import java.util.concurrent.CopyOnWriteArrayList;\r
29 import java.util.concurrent.CountDownLatch;\r
30 import java.util.concurrent.Executors;\r
31 import java.util.concurrent.ScheduledExecutorService;\r
32 import java.util.concurrent.TimeUnit;\r
33 import java.util.concurrent.atomic.AtomicReference;\r
34 \r
35 import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;\r
36 import org.openo.msb.wrapper.consul.model.ConsulResponse;\r
37 import org.openo.msb.wrapper.consul.model.catalog.CatalogService;\r
38 import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;\r
39 import org.openo.msb.wrapper.consul.option.QueryOptions;\r
40 import org.slf4j.Logger;\r
41 import org.slf4j.LoggerFactory;\r
42 \r
43 import com.google.common.annotations.VisibleForTesting;\r
44 import com.google.common.collect.ImmutableList;\r
45 \r
46 /**\r
47  * A cache structure that can provide an up-to-date read-only\r
48  * map backed by consul data\r
49  *\r
50  * @param <V>\r
51  */\r
52 public class ConsulCache4Map<K, V> {\r
53 \r
54     enum State {latent, starting, started, stopped }\r
55 \r
56     private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache4Map.class);\r
57 \r
58     private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);\r
59     private final AtomicReference<ImmutableList<ServiceInfo>> lastResponse = new AtomicReference<ImmutableList<ServiceInfo>>(ImmutableList.<ServiceInfo>of());\r
60     private final AtomicReference<State> state = new AtomicReference<State>(State.latent);\r
61     private final CountDownLatch initLatch = new CountDownLatch(1);\r
62     private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();\r
63     private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();\r
64 \r
65     private final CallbackConsumer<V> callBackConsumer;\r
66     private final ConsulResponseCallback<Map<String,List<String>>> responseCallback;\r
67 \r
68     ConsulCache4Map(CallbackConsumer<V> callbackConsumer) {\r
69         this( callbackConsumer, 10, TimeUnit.SECONDS);\r
70     }\r
71 \r
72     ConsulCache4Map(\r
73             CallbackConsumer<V> callbackConsumer,\r
74             final long backoffDelayQty,\r
75             final TimeUnit backoffDelayUnit) {\r
76 \r
77         this.callBackConsumer = callbackConsumer;\r
78 \r
79         this.responseCallback = new ConsulResponseCallback<Map<String,List<String>>>() {\r
80             @Override\r
81             public void onComplete(ConsulResponse<Map<String,List<String>>> consulResponse) {\r
82 \r
83                 if (!isRunning()) {\r
84                     return;\r
85                 }\r
86                 updateIndex(consulResponse);\r
87                 ImmutableList<ServiceInfo> full = convertToList(consulResponse);\r
88                 List<ServiceInfo> oldList=lastResponse.get();\r
89                 boolean changed = !full.equals(lastResponse.get());\r
90 //                LOGGER.info("service changed:"+changed+"----"+full);\r
91                 if (changed) {\r
92                     // changes\r
93                     lastResponse.set(full);\r
94                 }\r
95 \r
96                 if (changed) {\r
97                     for (Listener<K, V> l : listeners) {\r
98                         l.notify(oldList,full);\r
99                     }\r
100                 }\r
101 \r
102                 if (state.compareAndSet(State.starting, State.started)) {\r
103                     initLatch.countDown();\r
104                 }\r
105                 runCallback();\r
106             }\r
107 \r
108             @Override\r
109             public void onFailure(Throwable throwable) {\r
110 \r
111                 if (!isRunning()) {\r
112                     return;\r
113                 }\r
114                 LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);\r
115 \r
116                 executorService.schedule(new Runnable() {\r
117                     @Override\r
118                     public void run() {\r
119                         runCallback();\r
120                     }\r
121                 }, backoffDelayQty, backoffDelayUnit);\r
122             }\r
123         };\r
124     }\r
125 \r
126     public void start() throws Exception {\r
127         checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);\r
128         runCallback();\r
129     }\r
130 \r
131     public void stop() throws Exception {\r
132         State previous = state.getAndSet(State.stopped);\r
133         if (previous != State.stopped) {\r
134             executorService.shutdownNow();\r
135         }\r
136     }\r
137 \r
138     private void runCallback() {\r
139         if (isRunning()) {\r
140             callBackConsumer.consume(latestIndex.get(), responseCallback);\r
141         }\r
142     }\r
143 \r
144     private boolean isRunning() {\r
145         return state.get() == State.started || state.get() == State.starting;\r
146     }\r
147 \r
148     public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {\r
149         return initLatch.await(timeout, unit);\r
150     }\r
151 \r
152     public ImmutableList<ServiceInfo> getMap() {\r
153         return lastResponse.get();\r
154     }\r
155 \r
156     @VisibleForTesting\r
157     ImmutableList<ServiceInfo> convertToList(final ConsulResponse<Map<String,List<String>>> response) {\r
158         if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {\r
159             return ImmutableList.of();\r
160         }\r
161 \r
162         final ImmutableList.Builder<ServiceInfo> builder = ImmutableList.builder();\r
163         final Set<String> keySet = new HashSet<>();\r
164         \r
165         for(Map.Entry<String,List<String>> entry : response.getResponse().entrySet()) {\r
166  \r
167             String key = entry.getKey();  \r
168            \r
169             if (key != null && !"consul".equals(key)) {\r
170                 if (!keySet.contains(key)) {\r
171                     ServiceInfo serviceInfo=new ServiceInfo();\r
172                     serviceInfo.setServiceName(key);\r
173                     \r
174                     List<String> value=entry.getValue();\r
175                     for(String tag:value){\r
176                        \r
177                         if(tag.startsWith("version")){\r
178                              String version; \r
179                             if(tag.split(":").length==2)\r
180                             {\r
181                             version = tag.split(":")[1];\r
182                             }\r
183                             else{\r
184                                 version=""; \r
185                             }\r
186                             \r
187                             serviceInfo.setVersion(version);\r
188                             break;\r
189                         }\r
190                     }\r
191                     \r
192                     builder.add(serviceInfo);\r
193                 } else {\r
194                     System.out.println(key.toString());\r
195                     LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());\r
196                 }\r
197             }\r
198             keySet.add(key);\r
199          \r
200         }  \r
201         \r
202         \r
203         return builder.build();\r
204     }\r
205 \r
206     private void updateIndex(ConsulResponse<Map<String,List<String>>> consulResponse) {\r
207         if (consulResponse != null && consulResponse.getIndex() != null) {\r
208             this.latestIndex.set(consulResponse.getIndex());\r
209         }\r
210     }\r
211 \r
212     protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {\r
213         if (index == null) {\r
214             return QueryOptions.BLANK;\r
215         } else {\r
216             return QueryOptions.blockSeconds(blockSeconds, index).build();\r
217         }\r
218     }\r
219 \r
220     /**\r
221      * passed in by creators to vary the content of the cached values\r
222      *\r
223      * @param <V>\r
224      */\r
225     protected interface CallbackConsumer<V> {\r
226         void consume(BigInteger index, ConsulResponseCallback<Map<String,List<String>>> callback);\r
227     }\r
228 \r
229     /**\r
230      * Implementers can register a listener to receive\r
231      * a new map when it changes\r
232      *\r
233      * @param <V>\r
234      */\r
235     public interface Listener<K, V> {\r
236         void notify(List<ServiceInfo> oldValues,List<ServiceInfo> newValues);\r
237     }\r
238 \r
239     public boolean addListener(Listener<K, V> listener) {\r
240         boolean added = listeners.add(listener);\r
241         if (state.get() == State.started) {\r
242             listener.notify(lastResponse.get(),lastResponse.get());\r
243         }\r
244         return added;\r
245     }\r
246 \r
247     public boolean removeListener(Listener<K, V> listener) {\r
248         return listeners.remove(listener);\r
249     }\r
250 \r
251     @VisibleForTesting\r
252     protected State getState() {\r
253         return state.get();\r
254     }\r
255     \r
256  \r
257     \r
258 }\r