4986e548bff2b662228e2662d38273d8f5d5b602
[msb/apigateway.git] / msb-core / apiroute / apiroute-service / src / main / java / org / openo / msb / ConsulClientApp.java
1 /**
2  * Copyright 2016 2015-2016 ZTE, Inc. and others. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.openo.msb;
17
18 import java.net.MalformedURLException;
19 import java.net.URL;
20 import java.util.ArrayList;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.ListIterator;
24 import java.util.Map;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.apache.commons.lang3.StringUtils;
28 import org.openo.msb.api.MicroServiceFullInfo;
29 import org.openo.msb.api.MicroServiceInfo;
30 import org.openo.msb.api.Node;
31 import org.openo.msb.wrapper.MicroServiceWrapper;
32 import org.openo.msb.wrapper.consul.CatalogClient;
33 import org.openo.msb.wrapper.consul.Consul;
34 import org.openo.msb.wrapper.consul.HealthClient;
35 import org.openo.msb.wrapper.consul.cache.CatalogCache;
36 import org.openo.msb.wrapper.consul.cache.ConsulCache;
37 import org.openo.msb.wrapper.consul.cache.ConsulCache4Map;
38 import org.openo.msb.wrapper.consul.cache.HealthCache;
39 import org.openo.msb.wrapper.consul.cache.ServiceCache;
40 import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
41 import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;
42 import org.openo.msb.wrapper.consul.model.health.Service;
43 import org.openo.msb.wrapper.consul.model.health.ServiceHealth;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 public class ConsulClientApp {
48
49     private final Consul consul;
50     private final CatalogClient catalogClient;
51     private final HealthClient healthClient;
52     private AtomicReference<List<HealthCache>> cacheList = new AtomicReference<List<HealthCache>>(
53             new ArrayList<HealthCache>());
54
55
56     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClientApp.class);
57
58     public ConsulClientApp(String ip, int port) {
59         URL url = null;
60         try {
61             url = new URL("http", ip, port, "");
62         } catch (MalformedURLException e1) {
63             // TODO Auto-generated catch block
64             LOGGER.error("start  ConsulClientApp throw exception", e1);
65             throw new RuntimeException(e1);
66         }
67         this.consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
68         this.catalogClient = consul.catalogClient();
69         this.healthClient = consul.healthClient();
70     }
71
72     public Consul getConsul() {
73         return consul;
74     }
75
76     public CatalogClient getCatalogClient() {
77         return catalogClient;
78     }
79
80     private void stopNodeListen(String serviceName) {
81         try {
82
83             ListIterator<HealthCache> cacheListLit = cacheList.get().listIterator();
84             while (cacheListLit.hasNext()) {
85                 HealthCache cache = (HealthCache) cacheListLit.next();
86                 if (cache.getServiceName().equals(serviceName)) {
87
88                     cache.stop();
89                     cacheListLit.remove();
90                     LOGGER.info(cache.getServiceName() + " NodeListen stoped");
91                     break;
92                 }
93             }
94
95         } catch (Exception e) {
96             // TODO Auto-generated catch block
97             LOGGER.error("stop Node:[" + serviceName + "] Listen throw exception", e);
98         }
99
100
101     }
102
103     /**
104      * @Title startServiceListen
105      * @Description TODO(Open the consul registration services to monitor)
106      * @return void
107      */
108     public void startServiceListen() {
109         final ServiceCache serviceCache = ServiceCache.newCache(catalogClient, 30);
110         serviceCache.addListener(new ConsulCache4Map.Listener<String, Map<String, List<String>>>() {
111             @Override
112             public void notify(List<ServiceInfo> oldValues, List<ServiceInfo> newValues) {
113                 // do Something with updated server List
114                 LOGGER.info("--new service notify--");
115
116                 List<ServiceInfo> deRegisterServiceList = getDiffrent(oldValues, newValues);
117
118
119                 for (ServiceInfo serviceInfo : deRegisterServiceList) {
120                     try {
121                        
122                             MicroServiceWrapper.getInstance().deleteMicroService(
123                                     serviceInfo.getServiceName(), serviceInfo.getVersion());
124                         
125
126                         stopNodeListen(serviceInfo.getServiceName());
127                         LOGGER.info("Cancel MicroServiceInfo and stop node listen successs:"
128                                 + serviceInfo.getServiceName());
129                     } catch (Exception e) {
130                         LOGGER.error("Cancel MicroServiceInfo and stop node listen FAIL : ", e);
131
132                     }
133
134                 }
135
136
137                 List<ServiceInfo> registerServiceList = getDiffrent(newValues, oldValues);
138                 for (ServiceInfo serviceInfo : registerServiceList) {
139
140                     // if (deRegisterServiceList.contains(serviceInfo)) continue;
141
142
143                     LOGGER.info(" new serviceName:" + serviceInfo.getServiceName() + "  version:"
144                             + serviceInfo.getVersion());
145                     // Open Node to monitor new registration service
146                     startHealthNodeListen(serviceInfo.getServiceName(), serviceInfo.getVersion());
147
148                 }
149
150
151             }
152
153         });
154
155         try {
156             LOGGER.info("start...consul ... service..Listening.");
157             serviceCache.start();
158
159         } catch (Exception e) {
160             // TODO Auto-generated catch block
161             LOGGER.error("start...service..Listen throw exception", e);
162         }
163     }
164
165
166     /**
167      * @Title startHealthNodeListen
168      * @Description TODO(Open a service node changes to monitor, only to return to health service)
169      * @param serviceName
170      * @return
171      * @return HealthCache
172      */
173     private HealthCache startHealthNodeListen(final String serviceName, final String version) {
174         final HealthCache healthCache = HealthCache.newCache(healthClient, serviceName, 30);
175         healthCache.addListener(new HealthCache.Listener<String, ServiceHealth>() {
176             @Override
177             public void notify(Map<String, ServiceHealth> newValues) {
178                 // do Something with updated server map
179                 LOGGER.info(serviceName + "--new node notify--");
180
181                 if (newValues.isEmpty()) {
182                     LOGGER.info(serviceName + "--nodeList is Empty--");
183
184                   
185                         MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);
186                   
187                     // try {
188                     // healthCache.stop();
189                     // } catch (Exception e) {
190                     // LOGGER.equals(serviceName+"-- stop Node error:"+e.getMessage());
191                     // }
192
193                 } else {
194
195                     MicroServiceInfo microServiceInfo = new MicroServiceInfo();
196                     HashSet<Node> nodes = new HashSet<Node>();
197                     String url = "";
198                     String version = "", visualRange = "", protocol = "",lb_policy="";
199
200                     for (Map.Entry<String, ServiceHealth> entry : newValues.entrySet()) {
201                         String nodeName = entry.getKey().toString();
202                         ServiceHealth value = (ServiceHealth) entry.getValue();
203
204                         Node node = new Node();
205                         Service service = value.getService();
206                         node.setIp(service.getAddress());
207                         node.setPort(String.valueOf(service.getPort()));
208
209
210                         try {
211                             List<String> tagList = service.getTags();
212                             for (String tag : tagList) {
213                                 if (tag.startsWith("url")) {
214                                     if (tag.split(":").length == 2) {
215                                         url = tag.split(":")[1];
216                                     } else {
217                                         url = "";
218                                     }
219
220
221                                     continue;
222                                 }
223                                 if (tag.startsWith("version")) {
224                                     if (tag.split(":").length == 2) {
225                                         version = tag.split(":")[1];
226                                     } else {
227                                         version = "";
228                                     }
229                                     continue;
230                                 }
231                                 if (tag.startsWith("protocol")) {
232                                     protocol = tag.split(":")[1];
233                                     continue;
234                                 }
235                                 if (tag.startsWith("visualRange")) {
236                                     visualRange = tag.split(":")[1];
237                                     continue;
238                                 }
239                                 
240                                 if (tag.startsWith("lb_policy")) {
241                                     lb_policy = tag.split(":")[1];
242                                     continue;
243                                 }
244
245                             }
246
247
248                         } catch (Exception e) {
249                             LOGGER.error(serviceName + " read tag  throw exception", e);
250                             System.out.println(serviceName + " read tag  throw exception");
251                         }
252
253                         nodes.add(node);
254                     }
255
256                     microServiceInfo.setNodes(nodes);
257                     microServiceInfo.setProtocol(protocol);
258                     microServiceInfo.setUrl(url);
259                     microServiceInfo.setServiceName(serviceName);
260                     microServiceInfo.setLb_policy(lb_policy);
261                     if (!visualRange.isEmpty()) {
262                         microServiceInfo.setVisualRange(visualRange);
263                     }
264                     microServiceInfo.setVersion(version);
265
266                     try {
267                         MicroServiceFullInfo microServiceFullInfo =
268                                 MicroServiceWrapper.getInstance().saveMicroServiceInstance(
269                                         microServiceInfo, false, "", "");
270                         LOGGER.info("register MicroServiceInfo successs:"
271                                 + microServiceFullInfo.getServiceName());
272                     } catch (Exception e) {
273                         LOGGER.error("register MicroServiceInfo FAIL : " + serviceName, e);
274
275                     }
276                 }
277             }
278         });
279         try {
280             LOGGER.info(serviceName + " Node Listen start");
281             cacheList.get().add(healthCache);
282             healthCache.start();
283
284         } catch (Exception e) {
285             // TODO Auto-generated catch block
286             LOGGER.error(serviceName + " Node Listen start throw exception", e);
287         }
288
289         return healthCache;
290     }
291
292     /**
293      * @Title startNodeListen
294      * @Description TODO(Open a service node changes to monitor)
295      * @param serviceName
296      * @return
297      * @return CatalogCache
298      */
299     @Deprecated
300     private CatalogCache startNodeListen(final String serviceName) {
301         final CatalogCache catalogCache = CatalogCache.newCache(catalogClient, serviceName, 30);
302         catalogCache.addListener(new ConsulCache.Listener<String, CatalogService>() {
303             @Override
304             public void notify(Map<String, CatalogService> newValues) {
305                 // do Something with updated server map
306                 System.out.println(serviceName + "--new node notify--");
307                 LOGGER.info(serviceName + "--new node notify--");
308
309                 if (newValues.isEmpty()) {
310                     System.out.println(serviceName + "-- nodeList is Empty--");
311                     LOGGER.info(serviceName + "--nodeList is Empty-stop service[" + serviceName
312                             + "] listen-");
313                     try {
314                         catalogCache.stop();
315                     } catch (Exception e) {
316                         LOGGER.equals(serviceName + "-- stop Node error:" + e.getMessage());
317                     }
318
319                 } else {
320
321                     MicroServiceInfo microServiceInfo = new MicroServiceInfo();
322                     HashSet<Node> nodes = new HashSet<Node>();
323                     String url = "";
324                     String version = "", visualRange = "", protocol = "";
325
326                     for (Map.Entry<String, CatalogService> entry : newValues.entrySet()) {
327                         String nodeName = entry.getKey().toString();
328                         CatalogService value = (CatalogService) entry.getValue();
329
330                         Node node = new Node();
331                         node.setIp(value.getServiceAddress());
332                         node.setPort(String.valueOf(value.getServicePort()));
333
334
335                         try {
336                             List<String> tagList = value.getServiceTags();
337                             for (String tag : tagList) {
338                                 if (tag.startsWith("url")) {
339                                     if (tag.split(":").length == 2) {
340                                         url = tag.split(":")[1];
341                                     } else {
342                                         url = "";
343                                     }
344
345
346                                     continue;
347                                 }
348                                 if (tag.startsWith("version")) {
349                                     if (tag.split(":").length == 2) {
350                                         version = tag.split(":")[1];
351                                     } else {
352                                         version = "";
353                                     }
354                                     continue;
355                                 }
356                                 if (tag.startsWith("protocol")) {
357                                     protocol = tag.split(":")[1];
358                                     continue;
359                                 }
360                                 if (tag.startsWith("visualRange")) {
361                                     visualRange = tag.split(":")[1];
362                                     continue;
363                                 }
364                                 if (tag.startsWith("ttl")) {
365                                     int ttl = Integer.parseInt(tag.split(":")[1]);
366                                     node.setTtl(ttl);
367                                     continue;
368                                 }
369                             }
370
371
372                         } catch (Exception e) {
373                             LOGGER.error(serviceName + " read tag  throw exception", e);
374                             System.out.println(serviceName + " read tag  throw exception");
375                         }
376
377                         nodes.add(node);
378
379
380                         System.out.println(nodeName + ":" + value.getServiceAddress() + " "
381                                 + value.getServicePort() + " " + value.getServiceTags());
382                     }
383
384                     microServiceInfo.setNodes(nodes);
385                     microServiceInfo.setProtocol(protocol);
386                     microServiceInfo.setUrl(url);
387                     microServiceInfo.setServiceName(serviceName);
388                     if (!visualRange.isEmpty()) {
389                         microServiceInfo.setVisualRange(visualRange);
390                     }
391                     microServiceInfo.setVersion(version);
392
393                     try {
394                         MicroServiceFullInfo microServiceFullInfo =
395                                 MicroServiceWrapper.getInstance().saveMicroServiceInstance(
396                                         microServiceInfo, false, "", "");
397                         LOGGER.info("register MicroServiceInfo successs:" + microServiceFullInfo);
398                         System.out.println("register MicroServiceInfo successs:" + serviceName);
399                     } catch (Exception e) {
400                         LOGGER.error("register MicroServiceInfo FAIL : ", e);
401
402                     }
403                 }
404             }
405         });
406         try {
407             System.out.println(serviceName + " Node Listen start");
408             LOGGER.info(serviceName + " Node Listen start");
409             catalogCache.start();
410
411         } catch (Exception e) {
412             // TODO Auto-generated catch block
413             LOGGER.error(serviceName + " Node Listen start throw exception", e);
414         }
415
416         return catalogCache;
417     }
418
419
420     /**
421      * @Title getDiffrent
422      * @Description TODO(Extract the list1 and list2 different data sets)
423      * @param list1
424      * @param list2
425      * @return
426      * @return List<String>
427      */
428     private List<ServiceInfo> getDiffrent(List<ServiceInfo> list1, List<ServiceInfo> list2) {
429
430         List<ServiceInfo> diff = new ArrayList<ServiceInfo>();
431
432
433
434         for (ServiceInfo serviceInfo : list1) {
435             if (!list2.contains(serviceInfo)) {
436                 diff.add(serviceInfo);
437             }
438         }
439
440         return diff;
441     }
442
443     public static void main(String[] args) {
444         ConsulClientApp consulTest = new ConsulClientApp("127.0.0.1", 10081);
445         consulTest.startServiceListen();
446
447
448     }
449
450
451 }