--- /dev/null
+/**\r
+* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)\r
+*\r
+* Licensed under the Apache License, Version 2.0 (the "License");\r
+* you may not use this file except in compliance with the License.\r
+* You may obtain a copy of the License at\r
+*\r
+* http://www.apache.org/licenses/LICENSE-2.0\r
+*\r
+* Unless required by applicable law or agreed to in writing, software\r
+* distributed under the License is distributed on an "AS IS" BASIS,\r
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+* See the License for the specific language governing permissions and\r
+* limitations under the License.\r
+*/\r
+\r
+package org.openo.msb;\r
+\r
+import java.net.MalformedURLException;\r
+import java.net.URL;\r
+import java.util.ArrayList;\r
+import java.util.HashSet;\r
+import java.util.List;\r
+import java.util.ListIterator;\r
+import java.util.Map;\r
+import java.util.concurrent.atomic.AtomicReference;\r
+\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.openo.msb.api.MicroServiceFullInfo;\r
+import org.openo.msb.api.MicroServiceInfo;\r
+import org.openo.msb.api.Node;\r
+import org.openo.msb.wrapper.MicroServiceWrapper;\r
+import org.openo.msb.wrapper.consul.CatalogClient;\r
+import org.openo.msb.wrapper.consul.Consul;\r
+import org.openo.msb.wrapper.consul.HealthClient;\r
+import org.openo.msb.wrapper.consul.cache.CatalogCache;\r
+import org.openo.msb.wrapper.consul.cache.ConsulCache;\r
+import org.openo.msb.wrapper.consul.cache.ConsulCache4Map;\r
+import org.openo.msb.wrapper.consul.cache.HealthCache;\r
+import org.openo.msb.wrapper.consul.cache.ServiceCache;\r
+import org.openo.msb.wrapper.consul.model.catalog.CatalogService;\r
+import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;\r
+import org.openo.msb.wrapper.consul.model.health.Service;\r
+import org.openo.msb.wrapper.consul.model.health.ServiceHealth;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+public class ConsulClientApp {\r
+\r
+ private final Consul consul;\r
+ private final CatalogClient catalogClient;\r
+ private final HealthClient healthClient;\r
+ private AtomicReference<List<HealthCache>> cacheList = new AtomicReference<List<HealthCache>>(\r
+ new ArrayList<HealthCache>());\r
+\r
+\r
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClientApp.class);\r
+\r
+ public ConsulClientApp(String ip, int port) {\r
+ URL url = null;\r
+ try {\r
+ url = new URL("http", ip, port, "");\r
+ } catch (MalformedURLException e1) {\r
+ // TODO Auto-generated catch block\r
+ LOGGER.error("start ConsulClientApp throw exception", e1);\r
+ throw new RuntimeException(e1);\r
+ }\r
+ this.consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost\r
+ this.catalogClient = consul.catalogClient();\r
+ this.healthClient = consul.healthClient();\r
+ }\r
+\r
+ public Consul getConsul() {\r
+ return consul;\r
+ }\r
+\r
+ public CatalogClient getCatalogClient() {\r
+ return catalogClient;\r
+ }\r
+\r
+ private void stopNodeListen(String serviceName) {\r
+ try {\r
+\r
+ ListIterator<HealthCache> cacheListLit = cacheList.get().listIterator();\r
+ while (cacheListLit.hasNext()) {\r
+ HealthCache cache = (HealthCache) cacheListLit.next();\r
+ if (cache.getServiceName().equals(serviceName)) {\r
+\r
+ cache.stop();\r
+ cacheListLit.remove();\r
+ LOGGER.info(cache.getServiceName() + " NodeListen stoped");\r
+ break;\r
+ }\r
+ }\r
+\r
+ } catch (Exception e) {\r
+ // TODO Auto-generated catch block\r
+ LOGGER.error("stop Node:[" + serviceName + "] Listen throw exception", e);\r
+ }\r
+\r
+\r
+ }\r
+\r
+ /**\r
+ * @Title startServiceListen\r
+ * @Description TODO(Open the consul registration services to monitor)\r
+ * @return void\r
+ */\r
+ public void startServiceListen() {\r
+ final ServiceCache serviceCache = ServiceCache.newCache(catalogClient, 30);\r
+ serviceCache.addListener(new ConsulCache4Map.Listener<String, Map<String, List<String>>>() {\r
+ @Override\r
+ public void notify(List<ServiceInfo> oldValues, List<ServiceInfo> newValues) {\r
+ // do Something with updated server List\r
+ LOGGER.info("--new service notify--");\r
+\r
+ List<ServiceInfo> deRegisterServiceList = getDiffrent(oldValues, newValues);\r
+\r
+\r
+ for (ServiceInfo serviceInfo : deRegisterServiceList) {\r
+ try {\r
+ \r
+ MicroServiceWrapper.getInstance().deleteMicroService(\r
+ serviceInfo.getServiceName(), serviceInfo.getVersion());\r
+ \r
+\r
+ stopNodeListen(serviceInfo.getServiceName());\r
+ LOGGER.info("Cancel MicroServiceInfo and stop node listen successs:"\r
+ + serviceInfo.getServiceName());\r
+ } catch (Exception e) {\r
+ LOGGER.error("Cancel MicroServiceInfo and stop node listen FAIL : ", e);\r
+\r
+ }\r
+\r
+ }\r
+\r
+\r
+ List<ServiceInfo> registerServiceList = getDiffrent(newValues, oldValues);\r
+ for (ServiceInfo serviceInfo : registerServiceList) {\r
+\r
+ // if (deRegisterServiceList.contains(serviceInfo)) continue;\r
+\r
+\r
+ LOGGER.info(" new serviceName:" + serviceInfo.getServiceName() + " version:"\r
+ + serviceInfo.getVersion());\r
+ // Open Node to monitor new registration service\r
+ startHealthNodeListen(serviceInfo.getServiceName(), serviceInfo.getVersion());\r
+\r
+ }\r
+\r
+\r
+ }\r
+\r
+ });\r
+\r
+ try {\r
+ LOGGER.info("start...consul ... service..Listening.");\r
+ serviceCache.start();\r
+\r
+ } catch (Exception e) {\r
+ // TODO Auto-generated catch block\r
+ LOGGER.error("start...service..Listen throw exception", e);\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * @Title startHealthNodeListen\r
+ * @Description TODO(Open a service node changes to monitor, only to return to health service)\r
+ * @param serviceName\r
+ * @return\r
+ * @return HealthCache\r
+ */\r
+ private HealthCache startHealthNodeListen(final String serviceName, final String version) {\r
+ final HealthCache healthCache = HealthCache.newCache(healthClient, serviceName, 30);\r
+ healthCache.addListener(new HealthCache.Listener<String, ServiceHealth>() {\r
+ @Override\r
+ public void notify(Map<String, ServiceHealth> newValues) {\r
+ // do Something with updated server map\r
+ LOGGER.info(serviceName + "--new node notify--");\r
+\r
+ if (newValues.isEmpty()) {\r
+ LOGGER.info(serviceName + "--nodeList is Empty--");\r
+\r
+ \r
+ MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);\r
+ \r
+ // try {\r
+ // healthCache.stop();\r
+ // } catch (Exception e) {\r
+ // LOGGER.equals(serviceName+"-- stop Node error:"+e.getMessage());\r
+ // }\r
+\r
+ } else {\r
+\r
+ MicroServiceInfo microServiceInfo = new MicroServiceInfo();\r
+ HashSet<Node> nodes = new HashSet<Node>();\r
+ String url = "";\r
+ String version = "", visualRange = "", protocol = "",lb_policy="";\r
+\r
+ for (Map.Entry<String, ServiceHealth> entry : newValues.entrySet()) {\r
+ String nodeName = entry.getKey().toString();\r
+ ServiceHealth value = (ServiceHealth) entry.getValue();\r
+\r
+ Node node = new Node();\r
+ Service service = value.getService();\r
+ node.setIp(service.getAddress());\r
+ node.setPort(String.valueOf(service.getPort()));\r
+\r
+\r
+ try {\r
+ List<String> tagList = service.getTags();\r
+ for (String tag : tagList) {\r
+ if (tag.startsWith("url")) {\r
+ if (tag.split(":").length == 2) {\r
+ url = tag.split(":")[1];\r
+ } else {\r
+ url = "";\r
+ }\r
+\r
+\r
+ continue;\r
+ }\r
+ if (tag.startsWith("version")) {\r
+ if (tag.split(":").length == 2) {\r
+ version = tag.split(":")[1];\r
+ } else {\r
+ version = "";\r
+ }\r
+ continue;\r
+ }\r
+ if (tag.startsWith("protocol")) {\r
+ protocol = tag.split(":")[1];\r
+ continue;\r
+ }\r
+ if (tag.startsWith("visualRange")) {\r
+ visualRange = tag.split(":")[1];\r
+ continue;\r
+ }\r
+ \r
+ if (tag.startsWith("lb_policy")) {\r
+ lb_policy = tag.split(":")[1];\r
+ continue;\r
+ }\r
+\r
+ }\r
+\r
+\r
+ } catch (Exception e) {\r
+ LOGGER.error(serviceName + " read tag throw exception", e);\r
+ System.out.println(serviceName + " read tag throw exception");\r
+ }\r
+\r
+ nodes.add(node);\r
+ }\r
+\r
+ microServiceInfo.setNodes(nodes);\r
+ microServiceInfo.setProtocol(protocol);\r
+ microServiceInfo.setUrl(url);\r
+ microServiceInfo.setServiceName(serviceName);\r
+ microServiceInfo.setLb_policy(lb_policy);\r
+ if (!visualRange.isEmpty()) {\r
+ microServiceInfo.setVisualRange(visualRange);\r
+ }\r
+ microServiceInfo.setVersion(version);\r
+\r
+ try {\r
+ MicroServiceFullInfo microServiceFullInfo =\r
+ MicroServiceWrapper.getInstance().saveMicroServiceInstance(\r
+ microServiceInfo, false, "", "");\r
+ LOGGER.info("register MicroServiceInfo successs:"\r
+ + microServiceFullInfo.getServiceName());\r
+ } catch (Exception e) {\r
+ LOGGER.error("register MicroServiceInfo FAIL : " + serviceName, e);\r
+\r
+ }\r
+ }\r
+ }\r
+ });\r
+ try {\r
+ LOGGER.info(serviceName + " Node Listen start");\r
+ cacheList.get().add(healthCache);\r
+ healthCache.start();\r
+\r
+ } catch (Exception e) {\r
+ // TODO Auto-generated catch block\r
+ LOGGER.error(serviceName + " Node Listen start throw exception", e);\r
+ }\r
+\r
+ return healthCache;\r
+ }\r
+\r
+ /**\r
+ * @Title startNodeListen\r
+ * @Description TODO(Open a service node changes to monitor)\r
+ * @param serviceName\r
+ * @return\r
+ * @return CatalogCache\r
+ */\r
+ @Deprecated\r
+ private CatalogCache startNodeListen(final String serviceName) {\r
+ final CatalogCache catalogCache = CatalogCache.newCache(catalogClient, serviceName, 30);\r
+ catalogCache.addListener(new ConsulCache.Listener<String, CatalogService>() {\r
+ @Override\r
+ public void notify(Map<String, CatalogService> newValues) {\r
+ // do Something with updated server map\r
+ System.out.println(serviceName + "--new node notify--");\r
+ LOGGER.info(serviceName + "--new node notify--");\r
+\r
+ if (newValues.isEmpty()) {\r
+ System.out.println(serviceName + "-- nodeList is Empty--");\r
+ LOGGER.info(serviceName + "--nodeList is Empty-stop service[" + serviceName\r
+ + "] listen-");\r
+ try {\r
+ catalogCache.stop();\r
+ } catch (Exception e) {\r
+ LOGGER.equals(serviceName + "-- stop Node error:" + e.getMessage());\r
+ }\r
+\r
+ } else {\r
+\r
+ MicroServiceInfo microServiceInfo = new MicroServiceInfo();\r
+ HashSet<Node> nodes = new HashSet<Node>();\r
+ String url = "";\r
+ String version = "", visualRange = "", protocol = "";\r
+\r
+ for (Map.Entry<String, CatalogService> entry : newValues.entrySet()) {\r
+ String nodeName = entry.getKey().toString();\r
+ CatalogService value = (CatalogService) entry.getValue();\r
+\r
+ Node node = new Node();\r
+ node.setIp(value.getServiceAddress());\r
+ node.setPort(String.valueOf(value.getServicePort()));\r
+\r
+\r
+ try {\r
+ List<String> tagList = value.getServiceTags();\r
+ for (String tag : tagList) {\r
+ if (tag.startsWith("url")) {\r
+ if (tag.split(":").length == 2) {\r
+ url = tag.split(":")[1];\r
+ } else {\r
+ url = "";\r
+ }\r
+\r
+\r
+ continue;\r
+ }\r
+ if (tag.startsWith("version")) {\r
+ if (tag.split(":").length == 2) {\r
+ version = tag.split(":")[1];\r
+ } else {\r
+ version = "";\r
+ }\r
+ continue;\r
+ }\r
+ if (tag.startsWith("protocol")) {\r
+ protocol = tag.split(":")[1];\r
+ continue;\r
+ }\r
+ if (tag.startsWith("visualRange")) {\r
+ visualRange = tag.split(":")[1];\r
+ continue;\r
+ }\r
+ if (tag.startsWith("ttl")) {\r
+ int ttl = Integer.parseInt(tag.split(":")[1]);\r
+ node.setTtl(ttl);\r
+ continue;\r
+ }\r
+ }\r
+\r
+\r
+ } catch (Exception e) {\r
+ LOGGER.error(serviceName + " read tag throw exception", e);\r
+ System.out.println(serviceName + " read tag throw exception");\r
+ }\r
+\r
+ nodes.add(node);\r
+\r
+\r
+ System.out.println(nodeName + ":" + value.getServiceAddress() + " "\r
+ + value.getServicePort() + " " + value.getServiceTags());\r
+ }\r
+\r
+ microServiceInfo.setNodes(nodes);\r
+ microServiceInfo.setProtocol(protocol);\r
+ microServiceInfo.setUrl(url);\r
+ microServiceInfo.setServiceName(serviceName);\r
+ if (!visualRange.isEmpty()) {\r
+ microServiceInfo.setVisualRange(visualRange);\r
+ }\r
+ microServiceInfo.setVersion(version);\r
+\r
+ try {\r
+ MicroServiceFullInfo microServiceFullInfo =\r
+ MicroServiceWrapper.getInstance().saveMicroServiceInstance(\r
+ microServiceInfo, false, "", "");\r
+ LOGGER.info("register MicroServiceInfo successs:" + microServiceFullInfo);\r
+ System.out.println("register MicroServiceInfo successs:" + serviceName);\r
+ } catch (Exception e) {\r
+ LOGGER.error("register MicroServiceInfo FAIL : ", e);\r
+\r
+ }\r
+ }\r
+ }\r
+ });\r
+ try {\r
+ System.out.println(serviceName + " Node Listen start");\r
+ LOGGER.info(serviceName + " Node Listen start");\r
+ catalogCache.start();\r
+\r
+ } catch (Exception e) {\r
+ // TODO Auto-generated catch block\r
+ LOGGER.error(serviceName + " Node Listen start throw exception", e);\r
+ }\r
+\r
+ return catalogCache;\r
+ }\r
+\r
+\r
+ /**\r
+ * @Title getDiffrent\r
+ * @Description TODO(Extract the list1 and list2 different data sets)\r
+ * @param list1\r
+ * @param list2\r
+ * @return\r
+ * @return List<String>\r
+ */\r
+ private List<ServiceInfo> getDiffrent(List<ServiceInfo> list1, List<ServiceInfo> list2) {\r
+\r
+ List<ServiceInfo> diff = new ArrayList<ServiceInfo>();\r
+\r
+\r
+\r
+ for (ServiceInfo serviceInfo : list1) {\r
+ if (!list2.contains(serviceInfo)) {\r
+ diff.add(serviceInfo);\r
+ }\r
+ }\r
+\r
+ return diff;\r
+ }\r
+\r
+ public static void main(String[] args) {\r
+ ConsulClientApp consulTest = new ConsulClientApp("127.0.0.1", 10081);\r
+ consulTest.startServiceListen();\r
+\r
+\r
+ }\r
+\r
+\r
+}\r