Divide the MSB source codes into two repos
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / wrapper / queue / ServiceListConsumer.java
1 package org.onap.msb.apiroute.wrapper.queue;
2
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashSet;
6 import java.util.List;
7 import java.util.Set;
8
9 import org.apache.http.HttpEntity;
10 import org.onap.msb.apiroute.SyncDataManager;
11 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
12 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
13 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import com.fasterxml.jackson.core.JsonFactory;
18 import com.fasterxml.jackson.core.JsonParser;
19 import com.fasterxml.jackson.core.JsonToken;
20
21
22 public class ServiceListConsumer implements Runnable {
23
24         private static final Logger LOGGER = LoggerFactory
25                         .getLogger(ServiceListConsumer.class);
26
27         private boolean isRunning = true;
28
29         private int index;
30
31
32         public ServiceListConsumer() {
33                 this.index = 0;
34         }
35
36         public void run() {
37                 LOGGER.info("run ServiceList Consumer Thread [" + index + "]");
38
39                 while (isRunning) {
40                         try {
41                                 // 取最新一条记录
42                                 ServiceData<HttpEntity> serviceData = QueueManager
43                                                 .getInstance().takeFromServiceListQueue(index);
44                                  LOGGER.debug("ServiceList Consumer Thread [" + index +
45                                  "]  take out serviceData from Queue successfully");
46
47                                 HttpEntity newValues = serviceData.getData();
48
49                                 Set<String> newServiceNameList = filterServiceList(newValues);
50
51                                 if (ServiceListCache.getLatestServiceNamelist().size() == 0) {
52                                         boolean initSuccess=initServiceList(newServiceNameList);
53                                         if(initSuccess){
54                                           ServiceListCache.setLatestServiceNamelist(newServiceNameList);
55                                         }
56                                 } else {
57                                         updateServiceList(newServiceNameList);
58                                         ServiceListCache.setLatestServiceNamelist(newServiceNameList);
59                                 }
60
61                                 
62                         } catch (Exception e) {
63                                 LOGGER.error(
64                                                 "ServiceListConsumer throw  Exception: ", e);
65                         }
66                 }
67         }
68
69         private void startWatchService(String serviceName) {
70                 // start to Watch service nodes
71
72                 SyncDataManager.startWatchService(serviceName);
73         }
74
75         private void updateServiceList(Set<String> newServiceNameList) {
76                 Set<String> registerServiceNameList = CommonUtil.getDiffrent(
77                     ServiceListCache.getLatestServiceNamelist(), newServiceNameList);
78
79                 if (registerServiceNameList.size() > 0) {
80                         LOGGER.info("***need to start Watch Service num from consul :"
81                                         + registerServiceNameList.size());
82
83                         for (String serviceName : registerServiceNameList) {
84                                 startWatchService(serviceName);
85                         }
86                 }
87         }
88
89         private boolean initServiceList(Set<String> newServiceNameList) {
90                 LOGGER.info("***start to initialize  service List when System startup ***");
91
92                 Set<String> dbServiceNameList = MicroServiceWrapper
93                                 .getInstance().getAllMicroServiceKey();
94                 
95                 if(dbServiceNameList==null){             
96                   LOGGER.error("init ServiceList from redis fail ");
97                   return false;
98                 }
99                 
100                 
101                 // 对比删除redis脏数据
102                 Set<String> delServiceNameList = CommonUtil.getDiffrent(
103                                 newServiceNameList, dbServiceNameList);
104     
105                 LOGGER.info("***need to delete Service num from redis :"
106                                 + delServiceNameList.size());
107                 for (String serviceName : delServiceNameList) {
108                         try {
109                                 MicroServiceWrapper.getInstance()
110                                                 .deleteMicroService4AllVersion(serviceName);
111                                 LOGGER.info("delete MicroService success from initialize:[serviceName]"
112                                                 + serviceName);
113     
114                         } catch (Exception e) {
115                                 LOGGER.error(
116                                                 "initialize serviceList :Delete MicroServiceInfo serviceName:"
117                                                                 + serviceName + " FAIL : ", e);
118                         }
119                 }
120     
121                 // 启动同步开启监听全部服务列表
122                 LOGGER.info("***need to start Watch Service num from initialize :"
123                                 + newServiceNameList.size());
124     
125                 for (String serviceName : newServiceNameList) {
126                         startWatchService(serviceName);
127                 }
128                 
129                 return true;
130
131         }
132
133         /*private ImmutableSet<String> filterServiceList(
134                         final Map<String, List<String>> serviceList) {
135                 if (serviceList == null || serviceList.isEmpty()) {
136                         return ImmutableSet.of();
137                 }
138
139                 final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
140
141                 for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) {
142
143                         String key = entry.getKey();
144                         if (key != null && !"consul".equals(key)) {
145
146                                 List<String> value = entry.getValue();
147                                 if (ServiceFilter.getInstance().isFilterService(value)) {
148                                         builder.add(key);
149                                 }
150                         }
151                 }
152
153                 LOGGER.info("consul all service num:" + serviceList.size());
154                 LOGGER.info("consul filter service num:" + builder.build().size());
155
156                 return builder.build();
157         }
158 */
159         private Set<String> filterServiceList(final HttpEntity serviceList) {
160
161                 if (serviceList == null || serviceList.getContentLength() == 0) {
162                         return new HashSet<String>();
163                 }
164
165                 final Set<String> builder = new HashSet<String>();
166
167                 JsonFactory f = new JsonFactory();
168                 JsonParser jp = null;
169                 List<String> tagList = null;
170                 int inputServiceNum = 0;
171                 try {
172                         jp = f.createParser(serviceList.getContent());
173                         jp.nextToken();
174                         while (jp.nextToken() != JsonToken.END_OBJECT) {
175                                 String serviceName = jp.getCurrentName();
176                                 inputServiceNum++;
177                                 jp.nextToken();
178                                 tagList = new ArrayList<>();
179                                 while (jp.nextToken() != JsonToken.END_ARRAY) {
180                                         tagList.add(jp.getText());
181                                 }
182                                 
183                                 if (serviceName != null && !"consul".equals(serviceName)) {
184                                         if (ServiceFilter.getInstance().isFilterService(tagList)) {
185                                                 builder.add(serviceName);
186                                         }
187                                 }
188                         }
189                 } catch (IOException e) {
190                         LOGGER.warn("parse service list error",e);
191                         return new HashSet<String>();
192                 } finally {
193                         try {
194                                 jp.close();
195                         } catch (IOException e) {
196                                 LOGGER.warn("parse service list error",e);
197                                 return new HashSet<String>();
198                         }
199                 }
200                 
201                 int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size();
202 //              if(latestServiceNum!=builder.size()){
203                   LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+"  old——"+latestServiceNum);
204 //              }
205
206                 return builder;
207         }
208
209 }