Fix java check style warning
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / wrapper / queue / ServiceListConsumer.java
1 /*******************************************************************************
2  * Copyright 2016-2017 ZTE, Inc. and others.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  * 
7  * http://www.apache.org/licenses/LICENSE-2.0
8  * 
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  ******************************************************************************/
14 package org.onap.msb.apiroute.wrapper.queue;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Set;
21
22 import org.apache.http.HttpEntity;
23 import org.onap.msb.apiroute.SyncDataManager;
24 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
25 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
26 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import com.fasterxml.jackson.core.JsonFactory;
31 import com.fasterxml.jackson.core.JsonParser;
32 import com.fasterxml.jackson.core.JsonToken;
33
34
35 public class ServiceListConsumer implements Runnable {
36
37     private static final Logger LOGGER = LoggerFactory.getLogger(ServiceListConsumer.class);
38
39     private boolean isRunning = true;
40
41     private int index;
42
43
44     public ServiceListConsumer() {
45         this.index = 0;
46     }
47
48     public void run() {
49         LOGGER.info("run ServiceList Consumer Thread [" + index + "]");
50
51         while (isRunning) {
52             try {
53                 // 取最新一条记录
54                 ServiceData<HttpEntity> serviceData = QueueManager.getInstance().takeFromServiceListQueue(index);
55                 LOGGER.debug("ServiceList Consumer Thread [" + index
56                                 + "]  take out serviceData from Queue successfully");
57
58                 HttpEntity newValues = serviceData.getData();
59
60                 Set<String> newServiceNameList = filterServiceList(newValues);
61
62                 if (ServiceListCache.getLatestServiceNamelist().size() == 0) {
63                     boolean initSuccess = initServiceList(newServiceNameList);
64                     if (initSuccess) {
65                         ServiceListCache.setLatestServiceNamelist(newServiceNameList);
66                     }
67                 } else {
68                     updateServiceList(newServiceNameList);
69                     ServiceListCache.setLatestServiceNamelist(newServiceNameList);
70                 }
71
72
73             } catch (Exception e) {
74                 LOGGER.error("ServiceListConsumer throw  Exception: ", e);
75             }
76         }
77     }
78
79     private void startWatchService(String serviceName) {
80         // start to Watch service nodes
81
82         SyncDataManager.startWatchService(serviceName);
83     }
84
85     private void updateServiceList(Set<String> newServiceNameList) {
86         Set<String> registerServiceNameList =
87                         CommonUtil.getDiffrent(ServiceListCache.getLatestServiceNamelist(), newServiceNameList);
88
89         if (registerServiceNameList.size() > 0) {
90             LOGGER.info("***need to start Watch Service num from consul :" + registerServiceNameList.size());
91
92             for (String serviceName : registerServiceNameList) {
93                 startWatchService(serviceName);
94             }
95         }
96     }
97
98     private boolean initServiceList(Set<String> newServiceNameList) {
99         LOGGER.info("***start to initialize  service List when System startup ***");
100
101         Set<String> dbServiceNameList = MicroServiceWrapper.getInstance().getAllMicroServiceKey();
102
103         if (dbServiceNameList == null) {
104             LOGGER.error("init ServiceList from redis fail ");
105             return false;
106         }
107
108
109         // 对比删除redis脏数据
110         Set<String> delServiceNameList = CommonUtil.getDiffrent(newServiceNameList, dbServiceNameList);
111
112         LOGGER.info("***need to delete Service num from redis :" + delServiceNameList.size());
113         for (String serviceName : delServiceNameList) {
114             try {
115                 MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
116                 LOGGER.info("delete MicroService success from initialize:[serviceName]" + serviceName);
117
118             } catch (Exception e) {
119                 LOGGER.error("initialize serviceList :Delete MicroServiceInfo serviceName:" + serviceName + " FAIL : ",
120                                 e);
121             }
122         }
123
124         // 启动同步开启监听全部服务列表
125         LOGGER.info("***need to start Watch Service num from initialize :" + newServiceNameList.size());
126
127         for (String serviceName : newServiceNameList) {
128             startWatchService(serviceName);
129         }
130
131         return true;
132
133     }
134
135     /*
136      * private ImmutableSet<String> filterServiceList( final Map<String, List<String>> serviceList)
137      * { if (serviceList == null || serviceList.isEmpty()) { return ImmutableSet.of(); }
138      * 
139      * final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
140      * 
141      * for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) {
142      * 
143      * String key = entry.getKey(); if (key != null && !"consul".equals(key)) {
144      * 
145      * List<String> value = entry.getValue(); if
146      * (ServiceFilter.getInstance().isFilterService(value)) { builder.add(key); } } }
147      * 
148      * LOGGER.info("consul all service num:" + serviceList.size());
149      * LOGGER.info("consul filter service num:" + builder.build().size());
150      * 
151      * return builder.build(); }
152      */
153     private Set<String> filterServiceList(final HttpEntity serviceList) {
154
155         if (serviceList == null || serviceList.getContentLength() == 0) {
156             return new HashSet<String>();
157         }
158
159         final Set<String> builder = new HashSet<String>();
160
161         JsonFactory f = new JsonFactory();
162         JsonParser jp = null;
163         List<String> tagList = null;
164         int inputServiceNum = 0;
165         try {
166             jp = f.createParser(serviceList.getContent());
167             jp.nextToken();
168             while (jp.nextToken() != JsonToken.END_OBJECT) {
169                 String serviceName = jp.getCurrentName();
170                 inputServiceNum++;
171                 jp.nextToken();
172                 tagList = new ArrayList<>();
173                 while (jp.nextToken() != JsonToken.END_ARRAY) {
174                     tagList.add(jp.getText());
175                 }
176
177                 if (serviceName != null && !"consul".equals(serviceName)) {
178                     if (ServiceFilter.getInstance().isFilterService(tagList)) {
179                         builder.add(serviceName);
180                     }
181                 }
182             }
183         } catch (IOException e) {
184             LOGGER.warn("parse service list error", e);
185             return new HashSet<String>();
186         } finally {
187             try {
188                 jp.close();
189             } catch (IOException e) {
190                 LOGGER.warn("parse service list error", e);
191                 return new HashSet<String>();
192             }
193         }
194
195         int latestServiceNum = ServiceListCache.getLatestServiceNamelist().size();
196         // if(latestServiceNum!=builder.size()){
197         LOGGER.info("[consul] all service num:" + inputServiceNum + ", filter service num: new——" + builder.size()
198                         + "  old——" + latestServiceNum);
199         // }
200
201         return builder;
202     }
203
204 }