2 * Copyright 2016 2015-2016 ZTE, Inc. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.openo.msb;
18 import java.net.MalformedURLException;
20 import java.util.ArrayList;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.ListIterator;
25 import java.util.concurrent.atomic.AtomicReference;
27 import org.apache.commons.lang3.StringUtils;
28 import org.openo.msb.api.MicroServiceFullInfo;
29 import org.openo.msb.api.MicroServiceInfo;
30 import org.openo.msb.api.Node;
31 import org.openo.msb.wrapper.MicroServiceWrapper;
32 import org.openo.msb.wrapper.consul.CatalogClient;
33 import org.openo.msb.wrapper.consul.Consul;
34 import org.openo.msb.wrapper.consul.HealthClient;
35 import org.openo.msb.wrapper.consul.cache.CatalogCache;
36 import org.openo.msb.wrapper.consul.cache.ConsulCache;
37 import org.openo.msb.wrapper.consul.cache.ConsulCache4Map;
38 import org.openo.msb.wrapper.consul.cache.HealthCache;
39 import org.openo.msb.wrapper.consul.cache.ServiceCache;
40 import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
41 import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;
42 import org.openo.msb.wrapper.consul.model.health.Service;
43 import org.openo.msb.wrapper.consul.model.health.ServiceHealth;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 public class ConsulClientApp {
49 private final Consul consul;
50 private final CatalogClient catalogClient;
51 private final HealthClient healthClient;
52 private AtomicReference<List<HealthCache>> cacheList = new AtomicReference<List<HealthCache>>(
53 new ArrayList<HealthCache>());
56 private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClientApp.class);
58 public ConsulClientApp(String ip, int port) {
61 url = new URL("http", ip, port, "");
62 } catch (MalformedURLException e1) {
63 // TODO Auto-generated catch block
64 LOGGER.error("start ConsulClientApp throw exception", e1);
65 throw new RuntimeException(e1);
67 this.consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
68 this.catalogClient = consul.catalogClient();
69 this.healthClient = consul.healthClient();
72 public Consul getConsul() {
76 public CatalogClient getCatalogClient() {
80 private void stopNodeListen(String serviceName) {
83 ListIterator<HealthCache> cacheListLit = cacheList.get().listIterator();
84 while (cacheListLit.hasNext()) {
85 HealthCache cache = (HealthCache) cacheListLit.next();
86 if (cache.getServiceName().equals(serviceName)) {
89 cacheListLit.remove();
90 LOGGER.info(cache.getServiceName() + " NodeListen stoped");
95 } catch (Exception e) {
96 // TODO Auto-generated catch block
97 LOGGER.error("stop Node:[" + serviceName + "] Listen throw exception", e);
104 * @Title startServiceListen
105 * @Description TODO(Open the consul registration services to monitor)
108 public void startServiceListen() {
109 final ServiceCache serviceCache = ServiceCache.newCache(catalogClient, 30);
110 serviceCache.addListener(new ConsulCache4Map.Listener<String, Map<String, List<String>>>() {
112 public void notify(List<ServiceInfo> oldValues, List<ServiceInfo> newValues) {
113 // do Something with updated server List
114 LOGGER.info("--new service notify--");
116 List<ServiceInfo> deRegisterServiceList = getDiffrent(oldValues, newValues);
119 for (ServiceInfo serviceInfo : deRegisterServiceList) {
122 MicroServiceWrapper.getInstance().deleteMicroService(
123 serviceInfo.getServiceName(), serviceInfo.getVersion());
126 stopNodeListen(serviceInfo.getServiceName());
127 LOGGER.info("Cancel MicroServiceInfo and stop node listen successs:"
128 + serviceInfo.getServiceName());
129 } catch (Exception e) {
130 LOGGER.error("Cancel MicroServiceInfo and stop node listen FAIL : ", e);
137 List<ServiceInfo> registerServiceList = getDiffrent(newValues, oldValues);
138 for (ServiceInfo serviceInfo : registerServiceList) {
140 // if (deRegisterServiceList.contains(serviceInfo)) continue;
143 LOGGER.info(" new serviceName:" + serviceInfo.getServiceName() + " version:"
144 + serviceInfo.getVersion());
145 // Open Node to monitor new registration service
146 startHealthNodeListen(serviceInfo.getServiceName(), serviceInfo.getVersion());
156 LOGGER.info("start...consul ... service..Listening.");
157 serviceCache.start();
159 } catch (Exception e) {
160 // TODO Auto-generated catch block
161 LOGGER.error("start...service..Listen throw exception", e);
167 * @Title startHealthNodeListen
168 * @Description TODO(Open a service node changes to monitor, only to return to health service)
171 * @return HealthCache
173 private HealthCache startHealthNodeListen(final String serviceName, final String version) {
174 final HealthCache healthCache = HealthCache.newCache(healthClient, serviceName, 30);
175 healthCache.addListener(new HealthCache.Listener<String, ServiceHealth>() {
177 public void notify(Map<String, ServiceHealth> newValues) {
178 // do Something with updated server map
179 LOGGER.info(serviceName + "--new node notify--");
181 if (newValues.isEmpty()) {
182 LOGGER.info(serviceName + "--nodeList is Empty--");
185 MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);
188 // healthCache.stop();
189 // } catch (Exception e) {
190 // LOGGER.equals(serviceName+"-- stop Node error:"+e.getMessage());
195 MicroServiceInfo microServiceInfo = new MicroServiceInfo();
196 HashSet<Node> nodes = new HashSet<Node>();
198 String version = "", visualRange = "", protocol = "",lb_policy="";
200 for (Map.Entry<String, ServiceHealth> entry : newValues.entrySet()) {
201 String nodeName = entry.getKey().toString();
202 ServiceHealth value = (ServiceHealth) entry.getValue();
204 Node node = new Node();
205 Service service = value.getService();
206 node.setIp(service.getAddress());
207 node.setPort(String.valueOf(service.getPort()));
211 List<String> tagList = service.getTags();
212 for (String tag : tagList) {
213 if (tag.startsWith("url")) {
214 if (tag.split(":").length == 2) {
215 url = tag.split(":")[1];
223 if (tag.startsWith("version")) {
224 if (tag.split(":").length == 2) {
225 version = tag.split(":")[1];
231 if (tag.startsWith("protocol")) {
232 protocol = tag.split(":")[1];
235 if (tag.startsWith("visualRange")) {
236 visualRange = tag.split(":")[1];
240 if (tag.startsWith("lb_policy")) {
241 lb_policy = tag.split(":")[1];
248 } catch (Exception e) {
249 LOGGER.error(serviceName + " read tag throw exception", e);
250 System.out.println(serviceName + " read tag throw exception");
256 microServiceInfo.setNodes(nodes);
257 microServiceInfo.setProtocol(protocol);
258 microServiceInfo.setUrl(url);
259 microServiceInfo.setServiceName(serviceName);
260 microServiceInfo.setLb_policy(lb_policy);
261 if (!visualRange.isEmpty()) {
262 microServiceInfo.setVisualRange(visualRange);
264 microServiceInfo.setVersion(version);
267 MicroServiceFullInfo microServiceFullInfo =
268 MicroServiceWrapper.getInstance().saveMicroServiceInstance(
269 microServiceInfo, false, "", "");
270 LOGGER.info("register MicroServiceInfo successs:"
271 + microServiceFullInfo.getServiceName());
272 } catch (Exception e) {
273 LOGGER.error("register MicroServiceInfo FAIL : " + serviceName, e);
280 LOGGER.info(serviceName + " Node Listen start");
281 cacheList.get().add(healthCache);
284 } catch (Exception e) {
285 // TODO Auto-generated catch block
286 LOGGER.error(serviceName + " Node Listen start throw exception", e);
293 * @Title startNodeListen
294 * @Description TODO(Open a service node changes to monitor)
297 * @return CatalogCache
300 private CatalogCache startNodeListen(final String serviceName) {
301 final CatalogCache catalogCache = CatalogCache.newCache(catalogClient, serviceName, 30);
302 catalogCache.addListener(new ConsulCache.Listener<String, CatalogService>() {
304 public void notify(Map<String, CatalogService> newValues) {
305 // do Something with updated server map
306 System.out.println(serviceName + "--new node notify--");
307 LOGGER.info(serviceName + "--new node notify--");
309 if (newValues.isEmpty()) {
310 System.out.println(serviceName + "-- nodeList is Empty--");
311 LOGGER.info(serviceName + "--nodeList is Empty-stop service[" + serviceName
315 } catch (Exception e) {
316 LOGGER.equals(serviceName + "-- stop Node error:" + e.getMessage());
321 MicroServiceInfo microServiceInfo = new MicroServiceInfo();
322 HashSet<Node> nodes = new HashSet<Node>();
324 String version = "", visualRange = "", protocol = "";
326 for (Map.Entry<String, CatalogService> entry : newValues.entrySet()) {
327 String nodeName = entry.getKey().toString();
328 CatalogService value = (CatalogService) entry.getValue();
330 Node node = new Node();
331 node.setIp(value.getServiceAddress());
332 node.setPort(String.valueOf(value.getServicePort()));
336 List<String> tagList = value.getServiceTags();
337 for (String tag : tagList) {
338 if (tag.startsWith("url")) {
339 if (tag.split(":").length == 2) {
340 url = tag.split(":")[1];
348 if (tag.startsWith("version")) {
349 if (tag.split(":").length == 2) {
350 version = tag.split(":")[1];
356 if (tag.startsWith("protocol")) {
357 protocol = tag.split(":")[1];
360 if (tag.startsWith("visualRange")) {
361 visualRange = tag.split(":")[1];
364 if (tag.startsWith("ttl")) {
365 int ttl = Integer.parseInt(tag.split(":")[1]);
372 } catch (Exception e) {
373 LOGGER.error(serviceName + " read tag throw exception", e);
374 System.out.println(serviceName + " read tag throw exception");
380 System.out.println(nodeName + ":" + value.getServiceAddress() + " "
381 + value.getServicePort() + " " + value.getServiceTags());
384 microServiceInfo.setNodes(nodes);
385 microServiceInfo.setProtocol(protocol);
386 microServiceInfo.setUrl(url);
387 microServiceInfo.setServiceName(serviceName);
388 if (!visualRange.isEmpty()) {
389 microServiceInfo.setVisualRange(visualRange);
391 microServiceInfo.setVersion(version);
394 MicroServiceFullInfo microServiceFullInfo =
395 MicroServiceWrapper.getInstance().saveMicroServiceInstance(
396 microServiceInfo, false, "", "");
397 LOGGER.info("register MicroServiceInfo successs:" + microServiceFullInfo);
398 System.out.println("register MicroServiceInfo successs:" + serviceName);
399 } catch (Exception e) {
400 LOGGER.error("register MicroServiceInfo FAIL : ", e);
407 System.out.println(serviceName + " Node Listen start");
408 LOGGER.info(serviceName + " Node Listen start");
409 catalogCache.start();
411 } catch (Exception e) {
412 // TODO Auto-generated catch block
413 LOGGER.error(serviceName + " Node Listen start throw exception", e);
422 * @Description TODO(Extract the list1 and list2 different data sets)
426 * @return List<String>
428 private List<ServiceInfo> getDiffrent(List<ServiceInfo> list1, List<ServiceInfo> list2) {
430 List<ServiceInfo> diff = new ArrayList<ServiceInfo>();
434 for (ServiceInfo serviceInfo : list1) {
435 if (!list2.contains(serviceInfo)) {
436 diff.add(serviceInfo);
443 public static void main(String[] args) {
444 ConsulClientApp consulTest = new ConsulClientApp("127.0.0.1", 10081);
445 consulTest.startServiceListen();