Initial code import
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / openo / msb / ConsulClientApp.java
1 /**\r
2 * Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)\r
3 *\r
4 * Licensed under the Apache License, Version 2.0 (the "License");\r
5 * you may not use this file except in compliance with the License.\r
6 * You may obtain a copy of the License at\r
7 *\r
8 * http://www.apache.org/licenses/LICENSE-2.0\r
9 *\r
10 * Unless required by applicable law or agreed to in writing, software\r
11 * distributed under the License is distributed on an "AS IS" BASIS,\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13 * See the License for the specific language governing permissions and\r
14 * limitations under the License.\r
15 */\r
16 \r
17 package org.openo.msb;\r
18 \r
19 import java.net.MalformedURLException;\r
20 import java.net.URL;\r
21 import java.util.ArrayList;\r
22 import java.util.HashSet;\r
23 import java.util.List;\r
24 import java.util.ListIterator;\r
25 import java.util.Map;\r
26 import java.util.concurrent.atomic.AtomicReference;\r
27 \r
28 import org.apache.commons.lang3.StringUtils;\r
29 import org.openo.msb.api.MicroServiceFullInfo;\r
30 import org.openo.msb.api.MicroServiceInfo;\r
31 import org.openo.msb.api.Node;\r
32 import org.openo.msb.wrapper.MicroServiceWrapper;\r
33 import org.openo.msb.wrapper.consul.CatalogClient;\r
34 import org.openo.msb.wrapper.consul.Consul;\r
35 import org.openo.msb.wrapper.consul.HealthClient;\r
36 import org.openo.msb.wrapper.consul.cache.CatalogCache;\r
37 import org.openo.msb.wrapper.consul.cache.ConsulCache;\r
38 import org.openo.msb.wrapper.consul.cache.ConsulCache4Map;\r
39 import org.openo.msb.wrapper.consul.cache.HealthCache;\r
40 import org.openo.msb.wrapper.consul.cache.ServiceCache;\r
41 import org.openo.msb.wrapper.consul.model.catalog.CatalogService;\r
42 import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;\r
43 import org.openo.msb.wrapper.consul.model.health.Service;\r
44 import org.openo.msb.wrapper.consul.model.health.ServiceHealth;\r
45 import org.slf4j.Logger;\r
46 import org.slf4j.LoggerFactory;\r
47 \r
48 public class ConsulClientApp {\r
49 \r
50     private final Consul consul;\r
51     private final CatalogClient catalogClient;\r
52     private final HealthClient healthClient;\r
53     private AtomicReference<List<HealthCache>> cacheList = new AtomicReference<List<HealthCache>>(\r
54             new ArrayList<HealthCache>());\r
55 \r
56 \r
57     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClientApp.class);\r
58 \r
59     public ConsulClientApp(String ip, int port) {\r
60         URL url = null;\r
61         try {\r
62             url = new URL("http", ip, port, "");\r
63         } catch (MalformedURLException e1) {\r
64             // TODO Auto-generated catch block\r
65             LOGGER.error("start  ConsulClientApp throw exception", e1);\r
66             throw new RuntimeException(e1);\r
67         }\r
68         this.consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost\r
69         this.catalogClient = consul.catalogClient();\r
70         this.healthClient = consul.healthClient();\r
71     }\r
72 \r
73     public Consul getConsul() {\r
74         return consul;\r
75     }\r
76 \r
77     public CatalogClient getCatalogClient() {\r
78         return catalogClient;\r
79     }\r
80 \r
81     private void stopNodeListen(String serviceName) {\r
82         try {\r
83 \r
84             ListIterator<HealthCache> cacheListLit = cacheList.get().listIterator();\r
85             while (cacheListLit.hasNext()) {\r
86                 HealthCache cache = (HealthCache) cacheListLit.next();\r
87                 if (cache.getServiceName().equals(serviceName)) {\r
88 \r
89                     cache.stop();\r
90                     cacheListLit.remove();\r
91                     LOGGER.info(cache.getServiceName() + " NodeListen stoped");\r
92                     break;\r
93                 }\r
94             }\r
95 \r
96         } catch (Exception e) {\r
97             // TODO Auto-generated catch block\r
98             LOGGER.error("stop Node:[" + serviceName + "] Listen throw exception", e);\r
99         }\r
100 \r
101 \r
102     }\r
103 \r
104     /**\r
105      * @Title startServiceListen\r
106      * @Description TODO(Open the consul registration services to monitor)\r
107      * @return void\r
108      */\r
109     public void startServiceListen() {\r
110         final ServiceCache serviceCache = ServiceCache.newCache(catalogClient, 30);\r
111         serviceCache.addListener(new ConsulCache4Map.Listener<String, Map<String, List<String>>>() {\r
112             @Override\r
113             public void notify(List<ServiceInfo> oldValues, List<ServiceInfo> newValues) {\r
114                 // do Something with updated server List\r
115                 LOGGER.info("--new service notify--");\r
116 \r
117                 List<ServiceInfo> deRegisterServiceList = getDiffrent(oldValues, newValues);\r
118 \r
119 \r
120                 for (ServiceInfo serviceInfo : deRegisterServiceList) {\r
121                     try {\r
122                        \r
123                             MicroServiceWrapper.getInstance().deleteMicroService(\r
124                                     serviceInfo.getServiceName(), serviceInfo.getVersion());\r
125                         \r
126 \r
127                         stopNodeListen(serviceInfo.getServiceName());\r
128                         LOGGER.info("Cancel MicroServiceInfo and stop node listen successs:"\r
129                                 + serviceInfo.getServiceName());\r
130                     } catch (Exception e) {\r
131                         LOGGER.error("Cancel MicroServiceInfo and stop node listen FAIL : ", e);\r
132 \r
133                     }\r
134 \r
135                 }\r
136 \r
137 \r
138                 List<ServiceInfo> registerServiceList = getDiffrent(newValues, oldValues);\r
139                 for (ServiceInfo serviceInfo : registerServiceList) {\r
140 \r
141                     // if (deRegisterServiceList.contains(serviceInfo)) continue;\r
142 \r
143 \r
144                     LOGGER.info(" new serviceName:" + serviceInfo.getServiceName() + "  version:"\r
145                             + serviceInfo.getVersion());\r
146                     // Open Node to monitor new registration service\r
147                     startHealthNodeListen(serviceInfo.getServiceName(), serviceInfo.getVersion());\r
148 \r
149                 }\r
150 \r
151 \r
152             }\r
153 \r
154         });\r
155 \r
156         try {\r
157             LOGGER.info("start...consul ... service..Listening.");\r
158             serviceCache.start();\r
159 \r
160         } catch (Exception e) {\r
161             // TODO Auto-generated catch block\r
162             LOGGER.error("start...service..Listen throw exception", e);\r
163         }\r
164     }\r
165 \r
166 \r
167     /**\r
168      * @Title startHealthNodeListen\r
169      * @Description TODO(Open a service node changes to monitor, only to return to health service)\r
170      * @param serviceName\r
171      * @return\r
172      * @return HealthCache\r
173      */\r
174     private HealthCache startHealthNodeListen(final String serviceName, final String version) {\r
175         final HealthCache healthCache = HealthCache.newCache(healthClient, serviceName, 30);\r
176         healthCache.addListener(new HealthCache.Listener<String, ServiceHealth>() {\r
177             @Override\r
178             public void notify(Map<String, ServiceHealth> newValues) {\r
179                 // do Something with updated server map\r
180                 LOGGER.info(serviceName + "--new node notify--");\r
181 \r
182                 if (newValues.isEmpty()) {\r
183                     LOGGER.info(serviceName + "--nodeList is Empty--");\r
184 \r
185                   \r
186                         MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);\r
187                   \r
188                     // try {\r
189                     // healthCache.stop();\r
190                     // } catch (Exception e) {\r
191                     // LOGGER.equals(serviceName+"-- stop Node error:"+e.getMessage());\r
192                     // }\r
193 \r
194                 } else {\r
195 \r
196                     MicroServiceInfo microServiceInfo = new MicroServiceInfo();\r
197                     HashSet<Node> nodes = new HashSet<Node>();\r
198                     String url = "";\r
199                     String version = "", visualRange = "", protocol = "",lb_policy="";\r
200 \r
201                     for (Map.Entry<String, ServiceHealth> entry : newValues.entrySet()) {\r
202                         String nodeName = entry.getKey().toString();\r
203                         ServiceHealth value = (ServiceHealth) entry.getValue();\r
204 \r
205                         Node node = new Node();\r
206                         Service service = value.getService();\r
207                         node.setIp(service.getAddress());\r
208                         node.setPort(String.valueOf(service.getPort()));\r
209 \r
210 \r
211                         try {\r
212                             List<String> tagList = service.getTags();\r
213                             for (String tag : tagList) {\r
214                                 if (tag.startsWith("url")) {\r
215                                     if (tag.split(":").length == 2) {\r
216                                         url = tag.split(":")[1];\r
217                                     } else {\r
218                                         url = "";\r
219                                     }\r
220 \r
221 \r
222                                     continue;\r
223                                 }\r
224                                 if (tag.startsWith("version")) {\r
225                                     if (tag.split(":").length == 2) {\r
226                                         version = tag.split(":")[1];\r
227                                     } else {\r
228                                         version = "";\r
229                                     }\r
230                                     continue;\r
231                                 }\r
232                                 if (tag.startsWith("protocol")) {\r
233                                     protocol = tag.split(":")[1];\r
234                                     continue;\r
235                                 }\r
236                                 if (tag.startsWith("visualRange")) {\r
237                                     visualRange = tag.split(":")[1];\r
238                                     continue;\r
239                                 }\r
240                                 \r
241                                 if (tag.startsWith("lb_policy")) {\r
242                                     lb_policy = tag.split(":")[1];\r
243                                     continue;\r
244                                 }\r
245 \r
246                             }\r
247 \r
248 \r
249                         } catch (Exception e) {\r
250                             LOGGER.error(serviceName + " read tag  throw exception", e);\r
251                             System.out.println(serviceName + " read tag  throw exception");\r
252                         }\r
253 \r
254                         nodes.add(node);\r
255                     }\r
256 \r
257                     microServiceInfo.setNodes(nodes);\r
258                     microServiceInfo.setProtocol(protocol);\r
259                     microServiceInfo.setUrl(url);\r
260                     microServiceInfo.setServiceName(serviceName);\r
261                     microServiceInfo.setLb_policy(lb_policy);\r
262                     if (!visualRange.isEmpty()) {\r
263                         microServiceInfo.setVisualRange(visualRange);\r
264                     }\r
265                     microServiceInfo.setVersion(version);\r
266 \r
267                     try {\r
268                         MicroServiceFullInfo microServiceFullInfo =\r
269                                 MicroServiceWrapper.getInstance().saveMicroServiceInstance(\r
270                                         microServiceInfo, false, "", "");\r
271                         LOGGER.info("register MicroServiceInfo successs:"\r
272                                 + microServiceFullInfo.getServiceName());\r
273                     } catch (Exception e) {\r
274                         LOGGER.error("register MicroServiceInfo FAIL : " + serviceName, e);\r
275 \r
276                     }\r
277                 }\r
278             }\r
279         });\r
280         try {\r
281             LOGGER.info(serviceName + " Node Listen start");\r
282             cacheList.get().add(healthCache);\r
283             healthCache.start();\r
284 \r
285         } catch (Exception e) {\r
286             // TODO Auto-generated catch block\r
287             LOGGER.error(serviceName + " Node Listen start throw exception", e);\r
288         }\r
289 \r
290         return healthCache;\r
291     }\r
292 \r
293     /**\r
294      * @Title startNodeListen\r
295      * @Description TODO(Open a service node changes to monitor)\r
296      * @param serviceName\r
297      * @return\r
298      * @return CatalogCache\r
299      */\r
300     @Deprecated\r
301     private CatalogCache startNodeListen(final String serviceName) {\r
302         final CatalogCache catalogCache = CatalogCache.newCache(catalogClient, serviceName, 30);\r
303         catalogCache.addListener(new ConsulCache.Listener<String, CatalogService>() {\r
304             @Override\r
305             public void notify(Map<String, CatalogService> newValues) {\r
306                 // do Something with updated server map\r
307                 System.out.println(serviceName + "--new node notify--");\r
308                 LOGGER.info(serviceName + "--new node notify--");\r
309 \r
310                 if (newValues.isEmpty()) {\r
311                     System.out.println(serviceName + "-- nodeList is Empty--");\r
312                     LOGGER.info(serviceName + "--nodeList is Empty-stop service[" + serviceName\r
313                             + "] listen-");\r
314                     try {\r
315                         catalogCache.stop();\r
316                     } catch (Exception e) {\r
317                         LOGGER.equals(serviceName + "-- stop Node error:" + e.getMessage());\r
318                     }\r
319 \r
320                 } else {\r
321 \r
322                     MicroServiceInfo microServiceInfo = new MicroServiceInfo();\r
323                     HashSet<Node> nodes = new HashSet<Node>();\r
324                     String url = "";\r
325                     String version = "", visualRange = "", protocol = "";\r
326 \r
327                     for (Map.Entry<String, CatalogService> entry : newValues.entrySet()) {\r
328                         String nodeName = entry.getKey().toString();\r
329                         CatalogService value = (CatalogService) entry.getValue();\r
330 \r
331                         Node node = new Node();\r
332                         node.setIp(value.getServiceAddress());\r
333                         node.setPort(String.valueOf(value.getServicePort()));\r
334 \r
335 \r
336                         try {\r
337                             List<String> tagList = value.getServiceTags();\r
338                             for (String tag : tagList) {\r
339                                 if (tag.startsWith("url")) {\r
340                                     if (tag.split(":").length == 2) {\r
341                                         url = tag.split(":")[1];\r
342                                     } else {\r
343                                         url = "";\r
344                                     }\r
345 \r
346 \r
347                                     continue;\r
348                                 }\r
349                                 if (tag.startsWith("version")) {\r
350                                     if (tag.split(":").length == 2) {\r
351                                         version = tag.split(":")[1];\r
352                                     } else {\r
353                                         version = "";\r
354                                     }\r
355                                     continue;\r
356                                 }\r
357                                 if (tag.startsWith("protocol")) {\r
358                                     protocol = tag.split(":")[1];\r
359                                     continue;\r
360                                 }\r
361                                 if (tag.startsWith("visualRange")) {\r
362                                     visualRange = tag.split(":")[1];\r
363                                     continue;\r
364                                 }\r
365                                 if (tag.startsWith("ttl")) {\r
366                                     int ttl = Integer.parseInt(tag.split(":")[1]);\r
367                                     node.setTtl(ttl);\r
368                                     continue;\r
369                                 }\r
370                             }\r
371 \r
372 \r
373                         } catch (Exception e) {\r
374                             LOGGER.error(serviceName + " read tag  throw exception", e);\r
375                             System.out.println(serviceName + " read tag  throw exception");\r
376                         }\r
377 \r
378                         nodes.add(node);\r
379 \r
380 \r
381                         System.out.println(nodeName + ":" + value.getServiceAddress() + " "\r
382                                 + value.getServicePort() + " " + value.getServiceTags());\r
383                     }\r
384 \r
385                     microServiceInfo.setNodes(nodes);\r
386                     microServiceInfo.setProtocol(protocol);\r
387                     microServiceInfo.setUrl(url);\r
388                     microServiceInfo.setServiceName(serviceName);\r
389                     if (!visualRange.isEmpty()) {\r
390                         microServiceInfo.setVisualRange(visualRange);\r
391                     }\r
392                     microServiceInfo.setVersion(version);\r
393 \r
394                     try {\r
395                         MicroServiceFullInfo microServiceFullInfo =\r
396                                 MicroServiceWrapper.getInstance().saveMicroServiceInstance(\r
397                                         microServiceInfo, false, "", "");\r
398                         LOGGER.info("register MicroServiceInfo successs:" + microServiceFullInfo);\r
399                         System.out.println("register MicroServiceInfo successs:" + serviceName);\r
400                     } catch (Exception e) {\r
401                         LOGGER.error("register MicroServiceInfo FAIL : ", e);\r
402 \r
403                     }\r
404                 }\r
405             }\r
406         });\r
407         try {\r
408             System.out.println(serviceName + " Node Listen start");\r
409             LOGGER.info(serviceName + " Node Listen start");\r
410             catalogCache.start();\r
411 \r
412         } catch (Exception e) {\r
413             // TODO Auto-generated catch block\r
414             LOGGER.error(serviceName + " Node Listen start throw exception", e);\r
415         }\r
416 \r
417         return catalogCache;\r
418     }\r
419 \r
420 \r
421     /**\r
422      * @Title getDiffrent\r
423      * @Description TODO(Extract the list1 and list2 different data sets)\r
424      * @param list1\r
425      * @param list2\r
426      * @return\r
427      * @return List<String>\r
428      */\r
429     private List<ServiceInfo> getDiffrent(List<ServiceInfo> list1, List<ServiceInfo> list2) {\r
430 \r
431         List<ServiceInfo> diff = new ArrayList<ServiceInfo>();\r
432 \r
433 \r
434 \r
435         for (ServiceInfo serviceInfo : list1) {\r
436             if (!list2.contains(serviceInfo)) {\r
437                 diff.add(serviceInfo);\r
438             }\r
439         }\r
440 \r
441         return diff;\r
442     }\r
443 \r
444     public static void main(String[] args) {\r
445         ConsulClientApp consulTest = new ConsulClientApp("127.0.0.1", 10081);\r
446         consulTest.startServiceListen();\r
447 \r
448 \r
449     }\r
450 \r
451 \r
452 }\r