617a4e50b06e8f14379e4cafa85eee437f90b253
[msb/apigateway.git] /
1 /*******************************************************************************
2  * Copyright 2016-2017 ZTE, Inc. and others.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * 
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  * 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  ******************************************************************************/
16 package org.onap.msb.apiroute.wrapper.queue;
17
18 import java.io.IOException;
19 import java.util.ArrayList;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23
24 import org.apache.http.HttpEntity;
25 import org.onap.msb.apiroute.SyncDataManager;
26 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
27 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
28 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.fasterxml.jackson.core.JsonFactory;
33 import com.fasterxml.jackson.core.JsonParser;
34 import com.fasterxml.jackson.core.JsonToken;
35
36
37 public class ServiceListConsumer implements Runnable {
38
39         private static final Logger LOGGER = LoggerFactory
40                         .getLogger(ServiceListConsumer.class);
41
42         private boolean isRunning = true;
43
44         private int index;
45
46
47         public ServiceListConsumer() {
48                 this.index = 0;
49         }
50
51         public void run() {
52                 LOGGER.info("run ServiceList Consumer Thread [" + index + "]");
53
54                 while (isRunning) {
55                         try {
56                                 // 取最新一条记录
57                                 ServiceData<HttpEntity> serviceData = QueueManager
58                                                 .getInstance().takeFromServiceListQueue(index);
59                                  LOGGER.debug("ServiceList Consumer Thread [" + index +
60                                  "]  take out serviceData from Queue successfully");
61
62                                 HttpEntity newValues = serviceData.getData();
63
64                                 Set<String> newServiceNameList = filterServiceList(newValues);
65
66                                 if (ServiceListCache.getLatestServiceNamelist().size() == 0) {
67                                         boolean initSuccess=initServiceList(newServiceNameList);
68                                         if(initSuccess){
69                                           ServiceListCache.setLatestServiceNamelist(newServiceNameList);
70                                         }
71                                 } else {
72                                         updateServiceList(newServiceNameList);
73                                         ServiceListCache.setLatestServiceNamelist(newServiceNameList);
74                                 }
75
76                                 
77                         } catch (Exception e) {
78                                 LOGGER.error(
79                                                 "ServiceListConsumer throw  Exception: ", e);
80                         }
81                 }
82         }
83
84         private void startWatchService(String serviceName) {
85                 // start to Watch service nodes
86
87                 SyncDataManager.startWatchService(serviceName);
88         }
89
90         private void updateServiceList(Set<String> newServiceNameList) {
91                 Set<String> registerServiceNameList = CommonUtil.getDiffrent(
92                     ServiceListCache.getLatestServiceNamelist(), newServiceNameList);
93
94                 if (registerServiceNameList.size() > 0) {
95                         LOGGER.info("***need to start Watch Service num from consul :"
96                                         + registerServiceNameList.size());
97
98                         for (String serviceName : registerServiceNameList) {
99                                 startWatchService(serviceName);
100                         }
101                 }
102         }
103
104         private boolean initServiceList(Set<String> newServiceNameList) {
105                 LOGGER.info("***start to initialize  service List when System startup ***");
106
107                 Set<String> dbServiceNameList = MicroServiceWrapper
108                                 .getInstance().getAllMicroServiceKey();
109                 
110                 if(dbServiceNameList==null){             
111                   LOGGER.error("init ServiceList from redis fail ");
112                   return false;
113                 }
114                 
115                 
116                 // 对比删除redis脏数据
117                 Set<String> delServiceNameList = CommonUtil.getDiffrent(
118                                 newServiceNameList, dbServiceNameList);
119     
120                 LOGGER.info("***need to delete Service num from redis :"
121                                 + delServiceNameList.size());
122                 for (String serviceName : delServiceNameList) {
123                         try {
124                                 MicroServiceWrapper.getInstance()
125                                                 .deleteMicroService4AllVersion(serviceName);
126                                 LOGGER.info("delete MicroService success from initialize:[serviceName]"
127                                                 + serviceName);
128     
129                         } catch (Exception e) {
130                                 LOGGER.error(
131                                                 "initialize serviceList :Delete MicroServiceInfo serviceName:"
132                                                                 + serviceName + " FAIL : ", e);
133                         }
134                 }
135     
136                 // 启动同步开启监听全部服务列表
137                 LOGGER.info("***need to start Watch Service num from initialize :"
138                                 + newServiceNameList.size());
139     
140                 for (String serviceName : newServiceNameList) {
141                         startWatchService(serviceName);
142                 }
143                 
144                 return true;
145
146         }
147
148         /*private ImmutableSet<String> filterServiceList(
149                         final Map<String, List<String>> serviceList) {
150                 if (serviceList == null || serviceList.isEmpty()) {
151                         return ImmutableSet.of();
152                 }
153
154                 final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
155
156                 for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) {
157
158                         String key = entry.getKey();
159                         if (key != null && !"consul".equals(key)) {
160
161                                 List<String> value = entry.getValue();
162                                 if (ServiceFilter.getInstance().isFilterService(value)) {
163                                         builder.add(key);
164                                 }
165                         }
166                 }
167
168                 LOGGER.info("consul all service num:" + serviceList.size());
169                 LOGGER.info("consul filter service num:" + builder.build().size());
170
171                 return builder.build();
172         }
173 */
174         private Set<String> filterServiceList(final HttpEntity serviceList) {
175
176                 if (serviceList == null || serviceList.getContentLength() == 0) {
177                         return new HashSet<String>();
178                 }
179
180                 final Set<String> builder = new HashSet<String>();
181
182                 JsonFactory f = new JsonFactory();
183                 JsonParser jp = null;
184                 List<String> tagList = null;
185                 int inputServiceNum = 0;
186                 try {
187                         jp = f.createParser(serviceList.getContent());
188                         jp.nextToken();
189                         while (jp.nextToken() != JsonToken.END_OBJECT) {
190                                 String serviceName = jp.getCurrentName();
191                                 inputServiceNum++;
192                                 jp.nextToken();
193                                 tagList = new ArrayList<>();
194                                 while (jp.nextToken() != JsonToken.END_ARRAY) {
195                                         tagList.add(jp.getText());
196                                 }
197                                 
198                                 if (serviceName != null && !"consul".equals(serviceName)) {
199                                         if (ServiceFilter.getInstance().isFilterService(tagList)) {
200                                                 builder.add(serviceName);
201                                         }
202                                 }
203                         }
204                 } catch (IOException e) {
205                         LOGGER.warn("parse service list error",e);
206                         return new HashSet<String>();
207                 } finally {
208                         try {
209                                 jp.close();
210                         } catch (IOException e) {
211                                 LOGGER.warn("parse service list error",e);
212                                 return new HashSet<String>();
213                         }
214                 }
215                 
216                 int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size();
217 //              if(latestServiceNum!=builder.size()){
218                   LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+"  old——"+latestServiceNum);
219 //              }
220
221                 return builder;
222         }
223
224 }