modify copyright year
[msb/discovery.git] / sdclient / discovery-service / src / main / java / org / onap / msb / sdclient / wrapper / ConsulClientApp.java
1 /**
2  * Copyright 2016-2017 ZTE, Inc. and others.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.msb.sdclient.wrapper;
17
18 import java.net.MalformedURLException;
19 import java.net.URL;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.onap.msb.sdclient.core.KeyVaulePair;
28 import org.onap.msb.sdclient.core.MicroServiceFullInfo;
29 import org.onap.msb.sdclient.core.NodeInfo;
30 import org.onap.msb.sdclient.wrapper.consul.Consul;
31 import org.onap.msb.sdclient.wrapper.consul.HealthClient;
32 import org.onap.msb.sdclient.wrapper.consul.cache.HealthCache;
33 import org.onap.msb.sdclient.wrapper.consul.model.health.Service;
34 import org.onap.msb.sdclient.wrapper.consul.model.health.ServiceHealth;
35 import org.onap.msb.sdclient.wrapper.util.DiscoverUtil;
36 import org.onap.msb.sdclient.wrapper.util.JacksonJsonUtil;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 public class ConsulClientApp {
41
42     private final Consul consul;
43     private final HealthClient healthClient;
44 //    private AtomicReference<List<HealthCache>> cacheList = new AtomicReference<List<HealthCache>>(
45 //            new ArrayList<HealthCache>());
46
47
48     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClientApp.class);
49
50     public ConsulClientApp(String ip, int port) {
51         URL url = null;
52         try {
53             url = new URL("http", ip, port, "");
54         } catch (MalformedURLException e1) {
55             // TODO Auto-generated catch block
56             LOGGER.error("start  ConsulClientApp throw exception", e1);
57             throw new RuntimeException(e1);
58         }
59         this.consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
60         this.healthClient = consul.healthClient();
61     }
62
63     public Consul getConsul() {
64         return consul;
65     }
66
67
68
69    
70
71     /**
72      * @Title startHealthNodeListen
73      * @Description TODO(开启某个服务的node变化监听,只返回健康状态服务)
74      * @param serviceName
75      * @return
76      * @return HealthCache
77      */
78     public HealthCache startHealthNodeListen(final String serviceName) {
79         final HealthCache healthCache = HealthCache.newCache(healthClient, serviceName, 30);
80         healthCache.addListener(new HealthCache.Listener<String, ServiceHealth>() {
81             @Override
82             public void notify(Map<String, ServiceHealth> newValues) {
83                 // do Something with updated server map
84                 LOGGER.info(serviceName + "--new node notify--");
85                 
86                              
87                 if (newValues.isEmpty()) {
88                     LOGGER.warn(serviceName + "--nodeList is Empty--");
89                     PublishAddressWrapper.publishApigateWayList.remove(serviceName);
90                    
91                       try {
92                         healthCache.stop();
93                         LOGGER.info(serviceName + " Node Listen stopped");
94                       } catch (Exception e) {
95                         LOGGER.error(serviceName + " Node Listen stop throw exception", e);
96                       }
97                     
98                     return;
99                 } 
100                 //服务发现变化
101                 List<MicroServiceFullInfo> nodeAddressList=new ArrayList<MicroServiceFullInfo>(); 
102                 for (Map.Entry<String, ServiceHealth> entry : newValues.entrySet()) {
103
104                     MicroServiceFullInfo microServiceInfo = new MicroServiceFullInfo();
105                     
106                     ServiceHealth value = (ServiceHealth) entry.getValue();
107                     Service service = value.getService();
108                     
109                     NodeInfo node = new NodeInfo();
110                     node.setIp(service.getAddress());
111                     node.setPort(String.valueOf(service.getPort()));
112                     Set<NodeInfo> nodes = new HashSet<NodeInfo>();
113                     nodes.add(node);
114                     microServiceInfo.setNodes(nodes);
115
116
117                     microServiceInfo.setServiceName(serviceName);
118                     
119                     try {
120                       List<String> tagList = service.getTags();
121
122
123                       for (String tag : tagList) {
124
125                         if (tag.startsWith("\"ns\"")) {
126                           String ms_ns_json = tag.split("\"ns\":")[1];
127                           Map<String, String> nsMap =
128                               (Map<String, String>) JacksonJsonUtil.jsonToBean(ms_ns_json, Map.class);
129
130                           if (nsMap.get("namespace") != null) {
131                             microServiceInfo.setNamespace(nsMap.get("namespace"));
132                           }
133
134                           continue;
135                         }
136
137                         if (tag.startsWith("\"labels\"")) {
138                           String ms_labels_json = "{"+tag.split("\"labels\":\\{")[1];
139                           Map<String, String> labelMap  =
140                               (Map<String, String>) JacksonJsonUtil.jsonToBean(ms_labels_json, Map.class);
141
142                           List<String> nodeLabels = new ArrayList<String>();
143                           for (Map.Entry<String, String> labelEntry : labelMap.entrySet()) {
144                             if ("visualRange".equals(labelEntry.getKey())) {
145                               microServiceInfo.setVisualRange(labelEntry.getValue());
146                             } else if ("network_plane_type".equals(labelEntry.getKey())) {
147                               microServiceInfo.setNetwork_plane_type( labelEntry.getValue());
148                             } else {
149                               nodeLabels.add(labelEntry.getKey() + ":" + labelEntry.getValue());
150                             }
151
152                           }
153                           
154                           microServiceInfo.setLabels(nodeLabels);
155                           continue;
156                         }
157                         
158                         if (tag.startsWith("\"metadata\"")) {
159                           String ms_metadata_json = "{"+tag.split("\"metadata\":\\{")[1];
160                           Map<String, String> metadataMap =
161                               (Map<String, String>) JacksonJsonUtil.jsonToBean(ms_metadata_json, Map.class);
162
163                           List<KeyVaulePair> ms_metadata = new ArrayList<KeyVaulePair>();
164                        
165
166                           for (Map.Entry<String, String> metadataEntry : metadataMap.entrySet()) {
167                             KeyVaulePair keyVaulePair = new KeyVaulePair();
168                             keyVaulePair.setKey(metadataEntry.getKey());
169                             keyVaulePair.setValue(metadataEntry.getValue());
170                             ms_metadata.add(keyVaulePair);
171                           }
172                           microServiceInfo.setMetadata(ms_metadata);
173                           continue;
174                         }
175                     
176                       }
177
178
179                     } catch (Exception e) {
180                       LOGGER.error(serviceName + " read tag  throw exception", e);
181                     }
182                    
183                     nodeAddressList.add(microServiceInfo);
184                 }
185                 
186                 PublishAddressWrapper.publishApigateWayList.put(serviceName, nodeAddressList);
187              
188             }
189         });
190         try {
191             LOGGER.info(serviceName + " Node Listen start");
192 //            cacheList.get().add(healthCache);
193             healthCache.start();
194
195         } catch (Exception e) {
196             // TODO Auto-generated catch block
197             LOGGER.error(serviceName + " Node Listen start throw exception", e);
198         }
199
200         return healthCache;
201     }
202
203   
204     
205     public static void main(String[] args) {
206         ConsulClientApp consulTest = new ConsulClientApp("127.0.0.1", 8500);
207         // 监听服务变化
208         consulTest.startHealthNodeListen("apigateway");
209
210
211     }
212
213
214 }