Initial code import
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / openo / msb / wrapper / consul / cache / ConsulCache4Map.java
diff --git a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java b/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java
new file mode 100644 (file)
index 0000000..84fd8ec
--- /dev/null
@@ -0,0 +1,258 @@
+/**\r
+* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)\r
+*\r
+* Licensed under the Apache License, Version 2.0 (the "License");\r
+* you may not use this file except in compliance with the License.\r
+* You may obtain a copy of the License at\r
+*\r
+* http://www.apache.org/licenses/LICENSE-2.0\r
+*\r
+* Unless required by applicable law or agreed to in writing, software\r
+* distributed under the License is distributed on an "AS IS" BASIS,\r
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+* See the License for the specific language governing permissions and\r
+* limitations under the License.\r
+*/\r
+\r
+package org.openo.msb.wrapper.consul.cache;\r
+\r
+\r
+import static com.google.common.base.Preconditions.checkState;\r
+\r
+import java.math.BigInteger;\r
+import java.util.HashSet;\r
+import java.util.Iterator;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.CopyOnWriteArrayList;\r
+import java.util.concurrent.CountDownLatch;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.ScheduledExecutorService;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.atomic.AtomicReference;\r
+\r
+import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;\r
+import org.openo.msb.wrapper.consul.model.ConsulResponse;\r
+import org.openo.msb.wrapper.consul.model.catalog.CatalogService;\r
+import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;\r
+import org.openo.msb.wrapper.consul.option.QueryOptions;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.annotations.VisibleForTesting;\r
+import com.google.common.collect.ImmutableList;\r
+\r
+/**\r
+ * A cache structure that can provide an up-to-date read-only\r
+ * map backed by consul data\r
+ *\r
+ * @param <V>\r
+ */\r
+public class ConsulCache4Map<K, V> {\r
+\r
+    enum State {latent, starting, started, stopped }\r
+\r
+    private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache4Map.class);\r
+\r
+    private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);\r
+    private final AtomicReference<ImmutableList<ServiceInfo>> lastResponse = new AtomicReference<ImmutableList<ServiceInfo>>(ImmutableList.<ServiceInfo>of());\r
+    private final AtomicReference<State> state = new AtomicReference<State>(State.latent);\r
+    private final CountDownLatch initLatch = new CountDownLatch(1);\r
+    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();\r
+    private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();\r
+\r
+    private final CallbackConsumer<V> callBackConsumer;\r
+    private final ConsulResponseCallback<Map<String,List<String>>> responseCallback;\r
+\r
+    ConsulCache4Map(CallbackConsumer<V> callbackConsumer) {\r
+        this( callbackConsumer, 10, TimeUnit.SECONDS);\r
+    }\r
+\r
+    ConsulCache4Map(\r
+            CallbackConsumer<V> callbackConsumer,\r
+            final long backoffDelayQty,\r
+            final TimeUnit backoffDelayUnit) {\r
+\r
+        this.callBackConsumer = callbackConsumer;\r
+\r
+        this.responseCallback = new ConsulResponseCallback<Map<String,List<String>>>() {\r
+            @Override\r
+            public void onComplete(ConsulResponse<Map<String,List<String>>> consulResponse) {\r
+\r
+                if (!isRunning()) {\r
+                    return;\r
+                }\r
+                updateIndex(consulResponse);\r
+                ImmutableList<ServiceInfo> full = convertToList(consulResponse);\r
+                List<ServiceInfo> oldList=lastResponse.get();\r
+                boolean changed = !full.equals(lastResponse.get());\r
+//                LOGGER.info("service changed:"+changed+"----"+full);\r
+                if (changed) {\r
+                    // changes\r
+                    lastResponse.set(full);\r
+                }\r
+\r
+                if (changed) {\r
+                    for (Listener<K, V> l : listeners) {\r
+                        l.notify(oldList,full);\r
+                    }\r
+                }\r
+\r
+                if (state.compareAndSet(State.starting, State.started)) {\r
+                    initLatch.countDown();\r
+                }\r
+                runCallback();\r
+            }\r
+\r
+            @Override\r
+            public void onFailure(Throwable throwable) {\r
+\r
+                if (!isRunning()) {\r
+                    return;\r
+                }\r
+                LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);\r
+\r
+                executorService.schedule(new Runnable() {\r
+                    @Override\r
+                    public void run() {\r
+                        runCallback();\r
+                    }\r
+                }, backoffDelayQty, backoffDelayUnit);\r
+            }\r
+        };\r
+    }\r
+\r
+    public void start() throws Exception {\r
+        checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);\r
+        runCallback();\r
+    }\r
+\r
+    public void stop() throws Exception {\r
+        State previous = state.getAndSet(State.stopped);\r
+        if (previous != State.stopped) {\r
+            executorService.shutdownNow();\r
+        }\r
+    }\r
+\r
+    private void runCallback() {\r
+        if (isRunning()) {\r
+            callBackConsumer.consume(latestIndex.get(), responseCallback);\r
+        }\r
+    }\r
+\r
+    private boolean isRunning() {\r
+        return state.get() == State.started || state.get() == State.starting;\r
+    }\r
+\r
+    public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {\r
+        return initLatch.await(timeout, unit);\r
+    }\r
+\r
+    public ImmutableList<ServiceInfo> getMap() {\r
+        return lastResponse.get();\r
+    }\r
+\r
+    @VisibleForTesting\r
+    ImmutableList<ServiceInfo> convertToList(final ConsulResponse<Map<String,List<String>>> response) {\r
+        if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {\r
+            return ImmutableList.of();\r
+        }\r
+\r
+        final ImmutableList.Builder<ServiceInfo> builder = ImmutableList.builder();\r
+        final Set<String> keySet = new HashSet<>();\r
+        \r
+        for(Map.Entry<String,List<String>> entry : response.getResponse().entrySet()) {\r
\r
+            String key = entry.getKey();  \r
+           \r
+            if (key != null && !"consul".equals(key)) {\r
+                if (!keySet.contains(key)) {\r
+                    ServiceInfo serviceInfo=new ServiceInfo();\r
+                    serviceInfo.setServiceName(key);\r
+                    \r
+                    List<String> value=entry.getValue();\r
+                    for(String tag:value){\r
+                       \r
+                        if(tag.startsWith("version")){\r
+                             String version; \r
+                            if(tag.split(":").length==2)\r
+                            {\r
+                            version = tag.split(":")[1];\r
+                            }\r
+                            else{\r
+                                version=""; \r
+                            }\r
+                            \r
+                            serviceInfo.setVersion(version);\r
+                            break;\r
+                        }\r
+                    }\r
+                    \r
+                    builder.add(serviceInfo);\r
+                } else {\r
+                    System.out.println(key.toString());\r
+                    LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());\r
+                }\r
+            }\r
+            keySet.add(key);\r
+         \r
+        }  \r
+        \r
+        \r
+        return builder.build();\r
+    }\r
+\r
+    private void updateIndex(ConsulResponse<Map<String,List<String>>> consulResponse) {\r
+        if (consulResponse != null && consulResponse.getIndex() != null) {\r
+            this.latestIndex.set(consulResponse.getIndex());\r
+        }\r
+    }\r
+\r
+    protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {\r
+        if (index == null) {\r
+            return QueryOptions.BLANK;\r
+        } else {\r
+            return QueryOptions.blockSeconds(blockSeconds, index).build();\r
+        }\r
+    }\r
+\r
+    /**\r
+     * passed in by creators to vary the content of the cached values\r
+     *\r
+     * @param <V>\r
+     */\r
+    protected interface CallbackConsumer<V> {\r
+        void consume(BigInteger index, ConsulResponseCallback<Map<String,List<String>>> callback);\r
+    }\r
+\r
+    /**\r
+     * Implementers can register a listener to receive\r
+     * a new map when it changes\r
+     *\r
+     * @param <V>\r
+     */\r
+    public interface Listener<K, V> {\r
+        void notify(List<ServiceInfo> oldValues,List<ServiceInfo> newValues);\r
+    }\r
+\r
+    public boolean addListener(Listener<K, V> listener) {\r
+        boolean added = listeners.add(listener);\r
+        if (state.get() == State.started) {\r
+            listener.notify(lastResponse.get(),lastResponse.get());\r
+        }\r
+        return added;\r
+    }\r
+\r
+    public boolean removeListener(Listener<K, V> listener) {\r
+        return listeners.remove(listener);\r
+    }\r
+\r
+    @VisibleForTesting\r
+    protected State getState() {\r
+        return state.get();\r
+    }\r
+    \r
\r
+    \r
+}\r