2 * Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
17 package org.openo.msb;
\r
19 import java.net.MalformedURLException;
\r
20 import java.net.URL;
\r
21 import java.util.ArrayList;
\r
22 import java.util.HashSet;
\r
23 import java.util.List;
\r
24 import java.util.ListIterator;
\r
25 import java.util.Map;
\r
26 import java.util.concurrent.atomic.AtomicReference;
\r
28 import org.apache.commons.lang3.StringUtils;
\r
29 import org.openo.msb.api.MicroServiceFullInfo;
\r
30 import org.openo.msb.api.MicroServiceInfo;
\r
31 import org.openo.msb.api.Node;
\r
32 import org.openo.msb.wrapper.MicroServiceWrapper;
\r
33 import org.openo.msb.wrapper.consul.CatalogClient;
\r
34 import org.openo.msb.wrapper.consul.Consul;
\r
35 import org.openo.msb.wrapper.consul.HealthClient;
\r
36 import org.openo.msb.wrapper.consul.cache.CatalogCache;
\r
37 import org.openo.msb.wrapper.consul.cache.ConsulCache;
\r
38 import org.openo.msb.wrapper.consul.cache.ConsulCache4Map;
\r
39 import org.openo.msb.wrapper.consul.cache.HealthCache;
\r
40 import org.openo.msb.wrapper.consul.cache.ServiceCache;
\r
41 import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
\r
42 import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;
\r
43 import org.openo.msb.wrapper.consul.model.health.Service;
\r
44 import org.openo.msb.wrapper.consul.model.health.ServiceHealth;
\r
45 import org.slf4j.Logger;
\r
46 import org.slf4j.LoggerFactory;
\r
48 public class ConsulClientApp {
\r
50 private final Consul consul;
\r
51 private final CatalogClient catalogClient;
\r
52 private final HealthClient healthClient;
\r
53 private AtomicReference<List<HealthCache>> cacheList = new AtomicReference<List<HealthCache>>(
\r
54 new ArrayList<HealthCache>());
\r
57 private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClientApp.class);
\r
59 public ConsulClientApp(String ip, int port) {
\r
62 url = new URL("http", ip, port, "");
\r
63 } catch (MalformedURLException e1) {
\r
64 // TODO Auto-generated catch block
\r
65 LOGGER.error("start ConsulClientApp throw exception", e1);
\r
66 throw new RuntimeException(e1);
\r
68 this.consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
\r
69 this.catalogClient = consul.catalogClient();
\r
70 this.healthClient = consul.healthClient();
\r
73 public Consul getConsul() {
\r
77 public CatalogClient getCatalogClient() {
\r
78 return catalogClient;
\r
81 private void stopNodeListen(String serviceName) {
\r
84 ListIterator<HealthCache> cacheListLit = cacheList.get().listIterator();
\r
85 while (cacheListLit.hasNext()) {
\r
86 HealthCache cache = (HealthCache) cacheListLit.next();
\r
87 if (cache.getServiceName().equals(serviceName)) {
\r
90 cacheListLit.remove();
\r
91 LOGGER.info(cache.getServiceName() + " NodeListen stoped");
\r
96 } catch (Exception e) {
\r
97 // TODO Auto-generated catch block
\r
98 LOGGER.error("stop Node:[" + serviceName + "] Listen throw exception", e);
\r
105 * @Title startServiceListen
\r
106 * @Description TODO(Open the consul registration services to monitor)
\r
109 public void startServiceListen() {
\r
110 final ServiceCache serviceCache = ServiceCache.newCache(catalogClient, 30);
\r
111 serviceCache.addListener(new ConsulCache4Map.Listener<String, Map<String, List<String>>>() {
\r
113 public void notify(List<ServiceInfo> oldValues, List<ServiceInfo> newValues) {
\r
114 // do Something with updated server List
\r
115 LOGGER.info("--new service notify--");
\r
117 List<ServiceInfo> deRegisterServiceList = getDiffrent(oldValues, newValues);
\r
120 for (ServiceInfo serviceInfo : deRegisterServiceList) {
\r
123 MicroServiceWrapper.getInstance().deleteMicroService(
\r
124 serviceInfo.getServiceName(), serviceInfo.getVersion());
\r
127 stopNodeListen(serviceInfo.getServiceName());
\r
128 LOGGER.info("Cancel MicroServiceInfo and stop node listen successs:"
\r
129 + serviceInfo.getServiceName());
\r
130 } catch (Exception e) {
\r
131 LOGGER.error("Cancel MicroServiceInfo and stop node listen FAIL : ", e);
\r
138 List<ServiceInfo> registerServiceList = getDiffrent(newValues, oldValues);
\r
139 for (ServiceInfo serviceInfo : registerServiceList) {
\r
141 // if (deRegisterServiceList.contains(serviceInfo)) continue;
\r
144 LOGGER.info(" new serviceName:" + serviceInfo.getServiceName() + " version:"
\r
145 + serviceInfo.getVersion());
\r
146 // Open Node to monitor new registration service
\r
147 startHealthNodeListen(serviceInfo.getServiceName(), serviceInfo.getVersion());
\r
157 LOGGER.info("start...consul ... service..Listening.");
\r
158 serviceCache.start();
\r
160 } catch (Exception e) {
\r
161 // TODO Auto-generated catch block
\r
162 LOGGER.error("start...service..Listen throw exception", e);
\r
168 * @Title startHealthNodeListen
\r
169 * @Description TODO(Open a service node changes to monitor, only to return to health service)
\r
170 * @param serviceName
\r
172 * @return HealthCache
\r
174 private HealthCache startHealthNodeListen(final String serviceName, final String version) {
\r
175 final HealthCache healthCache = HealthCache.newCache(healthClient, serviceName, 30);
\r
176 healthCache.addListener(new HealthCache.Listener<String, ServiceHealth>() {
\r
178 public void notify(Map<String, ServiceHealth> newValues) {
\r
179 // do Something with updated server map
\r
180 LOGGER.info(serviceName + "--new node notify--");
\r
182 if (newValues.isEmpty()) {
\r
183 LOGGER.info(serviceName + "--nodeList is Empty--");
\r
186 MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);
\r
189 // healthCache.stop();
\r
190 // } catch (Exception e) {
\r
191 // LOGGER.equals(serviceName+"-- stop Node error:"+e.getMessage());
\r
196 MicroServiceInfo microServiceInfo = new MicroServiceInfo();
\r
197 HashSet<Node> nodes = new HashSet<Node>();
\r
199 String version = "", visualRange = "", protocol = "",lb_policy="";
\r
201 for (Map.Entry<String, ServiceHealth> entry : newValues.entrySet()) {
\r
202 String nodeName = entry.getKey().toString();
\r
203 ServiceHealth value = (ServiceHealth) entry.getValue();
\r
205 Node node = new Node();
\r
206 Service service = value.getService();
\r
207 node.setIp(service.getAddress());
\r
208 node.setPort(String.valueOf(service.getPort()));
\r
212 List<String> tagList = service.getTags();
\r
213 for (String tag : tagList) {
\r
214 if (tag.startsWith("url")) {
\r
215 if (tag.split(":").length == 2) {
\r
216 url = tag.split(":")[1];
\r
224 if (tag.startsWith("version")) {
\r
225 if (tag.split(":").length == 2) {
\r
226 version = tag.split(":")[1];
\r
232 if (tag.startsWith("protocol")) {
\r
233 protocol = tag.split(":")[1];
\r
236 if (tag.startsWith("visualRange")) {
\r
237 visualRange = tag.split(":")[1];
\r
241 if (tag.startsWith("lb_policy")) {
\r
242 lb_policy = tag.split(":")[1];
\r
249 } catch (Exception e) {
\r
250 LOGGER.error(serviceName + " read tag throw exception", e);
\r
251 System.out.println(serviceName + " read tag throw exception");
\r
257 microServiceInfo.setNodes(nodes);
\r
258 microServiceInfo.setProtocol(protocol);
\r
259 microServiceInfo.setUrl(url);
\r
260 microServiceInfo.setServiceName(serviceName);
\r
261 microServiceInfo.setLb_policy(lb_policy);
\r
262 if (!visualRange.isEmpty()) {
\r
263 microServiceInfo.setVisualRange(visualRange);
\r
265 microServiceInfo.setVersion(version);
\r
268 MicroServiceFullInfo microServiceFullInfo =
\r
269 MicroServiceWrapper.getInstance().saveMicroServiceInstance(
\r
270 microServiceInfo, false, "", "");
\r
271 LOGGER.info("register MicroServiceInfo successs:"
\r
272 + microServiceFullInfo.getServiceName());
\r
273 } catch (Exception e) {
\r
274 LOGGER.error("register MicroServiceInfo FAIL : " + serviceName, e);
\r
281 LOGGER.info(serviceName + " Node Listen start");
\r
282 cacheList.get().add(healthCache);
\r
283 healthCache.start();
\r
285 } catch (Exception e) {
\r
286 // TODO Auto-generated catch block
\r
287 LOGGER.error(serviceName + " Node Listen start throw exception", e);
\r
290 return healthCache;
\r
294 * @Title startNodeListen
\r
295 * @Description TODO(Open a service node changes to monitor)
\r
296 * @param serviceName
\r
298 * @return CatalogCache
\r
301 private CatalogCache startNodeListen(final String serviceName) {
\r
302 final CatalogCache catalogCache = CatalogCache.newCache(catalogClient, serviceName, 30);
\r
303 catalogCache.addListener(new ConsulCache.Listener<String, CatalogService>() {
\r
305 public void notify(Map<String, CatalogService> newValues) {
\r
306 // do Something with updated server map
\r
307 System.out.println(serviceName + "--new node notify--");
\r
308 LOGGER.info(serviceName + "--new node notify--");
\r
310 if (newValues.isEmpty()) {
\r
311 System.out.println(serviceName + "-- nodeList is Empty--");
\r
312 LOGGER.info(serviceName + "--nodeList is Empty-stop service[" + serviceName
\r
315 catalogCache.stop();
\r
316 } catch (Exception e) {
\r
317 LOGGER.equals(serviceName + "-- stop Node error:" + e.getMessage());
\r
322 MicroServiceInfo microServiceInfo = new MicroServiceInfo();
\r
323 HashSet<Node> nodes = new HashSet<Node>();
\r
325 String version = "", visualRange = "", protocol = "";
\r
327 for (Map.Entry<String, CatalogService> entry : newValues.entrySet()) {
\r
328 String nodeName = entry.getKey().toString();
\r
329 CatalogService value = (CatalogService) entry.getValue();
\r
331 Node node = new Node();
\r
332 node.setIp(value.getServiceAddress());
\r
333 node.setPort(String.valueOf(value.getServicePort()));
\r
337 List<String> tagList = value.getServiceTags();
\r
338 for (String tag : tagList) {
\r
339 if (tag.startsWith("url")) {
\r
340 if (tag.split(":").length == 2) {
\r
341 url = tag.split(":")[1];
\r
349 if (tag.startsWith("version")) {
\r
350 if (tag.split(":").length == 2) {
\r
351 version = tag.split(":")[1];
\r
357 if (tag.startsWith("protocol")) {
\r
358 protocol = tag.split(":")[1];
\r
361 if (tag.startsWith("visualRange")) {
\r
362 visualRange = tag.split(":")[1];
\r
365 if (tag.startsWith("ttl")) {
\r
366 int ttl = Integer.parseInt(tag.split(":")[1]);
\r
373 } catch (Exception e) {
\r
374 LOGGER.error(serviceName + " read tag throw exception", e);
\r
375 System.out.println(serviceName + " read tag throw exception");
\r
381 System.out.println(nodeName + ":" + value.getServiceAddress() + " "
\r
382 + value.getServicePort() + " " + value.getServiceTags());
\r
385 microServiceInfo.setNodes(nodes);
\r
386 microServiceInfo.setProtocol(protocol);
\r
387 microServiceInfo.setUrl(url);
\r
388 microServiceInfo.setServiceName(serviceName);
\r
389 if (!visualRange.isEmpty()) {
\r
390 microServiceInfo.setVisualRange(visualRange);
\r
392 microServiceInfo.setVersion(version);
\r
395 MicroServiceFullInfo microServiceFullInfo =
\r
396 MicroServiceWrapper.getInstance().saveMicroServiceInstance(
\r
397 microServiceInfo, false, "", "");
\r
398 LOGGER.info("register MicroServiceInfo successs:" + microServiceFullInfo);
\r
399 System.out.println("register MicroServiceInfo successs:" + serviceName);
\r
400 } catch (Exception e) {
\r
401 LOGGER.error("register MicroServiceInfo FAIL : ", e);
\r
408 System.out.println(serviceName + " Node Listen start");
\r
409 LOGGER.info(serviceName + " Node Listen start");
\r
410 catalogCache.start();
\r
412 } catch (Exception e) {
\r
413 // TODO Auto-generated catch block
\r
414 LOGGER.error(serviceName + " Node Listen start throw exception", e);
\r
417 return catalogCache;
\r
422 * @Title getDiffrent
\r
423 * @Description TODO(Extract the list1 and list2 different data sets)
\r
427 * @return List<String>
\r
429 private List<ServiceInfo> getDiffrent(List<ServiceInfo> list1, List<ServiceInfo> list2) {
\r
431 List<ServiceInfo> diff = new ArrayList<ServiceInfo>();
\r
435 for (ServiceInfo serviceInfo : list1) {
\r
436 if (!list2.contains(serviceInfo)) {
\r
437 diff.add(serviceInfo);
\r
444 public static void main(String[] args) {
\r
445 ConsulClientApp consulTest = new ConsulClientApp("127.0.0.1", 10081);
\r
446 consulTest.startServiceListen();
\r