1 package org.onap.msb.apiroute.wrapper.queue;
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashSet;
9 import org.apache.http.HttpEntity;
10 import org.onap.msb.apiroute.SyncDataManager;
11 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
12 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
13 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
17 import com.fasterxml.jackson.core.JsonFactory;
18 import com.fasterxml.jackson.core.JsonParser;
19 import com.fasterxml.jackson.core.JsonToken;
22 public class ServiceListConsumer implements Runnable {
24 private static final Logger LOGGER = LoggerFactory
25 .getLogger(ServiceListConsumer.class);
27 private boolean isRunning = true;
32 public ServiceListConsumer() {
37 LOGGER.info("run ServiceList Consumer Thread [" + index + "]");
42 ServiceData<HttpEntity> serviceData = QueueManager
43 .getInstance().takeFromServiceListQueue(index);
44 LOGGER.debug("ServiceList Consumer Thread [" + index +
45 "] take out serviceData from Queue successfully");
47 HttpEntity newValues = serviceData.getData();
49 Set<String> newServiceNameList = filterServiceList(newValues);
51 if (ServiceListCache.getLatestServiceNamelist().size() == 0) {
52 boolean initSuccess=initServiceList(newServiceNameList);
54 ServiceListCache.setLatestServiceNamelist(newServiceNameList);
57 updateServiceList(newServiceNameList);
58 ServiceListCache.setLatestServiceNamelist(newServiceNameList);
62 } catch (Exception e) {
64 "ServiceListConsumer throw Exception: ", e);
69 private void startWatchService(String serviceName) {
70 // start to Watch service nodes
72 SyncDataManager.startWatchService(serviceName);
75 private void updateServiceList(Set<String> newServiceNameList) {
76 Set<String> registerServiceNameList = CommonUtil.getDiffrent(
77 ServiceListCache.getLatestServiceNamelist(), newServiceNameList);
79 if (registerServiceNameList.size() > 0) {
80 LOGGER.info("***need to start Watch Service num from consul :"
81 + registerServiceNameList.size());
83 for (String serviceName : registerServiceNameList) {
84 startWatchService(serviceName);
89 private boolean initServiceList(Set<String> newServiceNameList) {
90 LOGGER.info("***start to initialize service List when System startup ***");
92 Set<String> dbServiceNameList = MicroServiceWrapper
93 .getInstance().getAllMicroServiceKey();
95 if(dbServiceNameList==null){
96 LOGGER.error("init ServiceList from redis fail ");
102 Set<String> delServiceNameList = CommonUtil.getDiffrent(
103 newServiceNameList, dbServiceNameList);
105 LOGGER.info("***need to delete Service num from redis :"
106 + delServiceNameList.size());
107 for (String serviceName : delServiceNameList) {
109 MicroServiceWrapper.getInstance()
110 .deleteMicroService4AllVersion(serviceName);
111 LOGGER.info("delete MicroService success from initialize:[serviceName]"
114 } catch (Exception e) {
116 "initialize serviceList :Delete MicroServiceInfo serviceName:"
117 + serviceName + " FAIL : ", e);
122 LOGGER.info("***need to start Watch Service num from initialize :"
123 + newServiceNameList.size());
125 for (String serviceName : newServiceNameList) {
126 startWatchService(serviceName);
133 /*private ImmutableSet<String> filterServiceList(
134 final Map<String, List<String>> serviceList) {
135 if (serviceList == null || serviceList.isEmpty()) {
136 return ImmutableSet.of();
139 final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
141 for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) {
143 String key = entry.getKey();
144 if (key != null && !"consul".equals(key)) {
146 List<String> value = entry.getValue();
147 if (ServiceFilter.getInstance().isFilterService(value)) {
153 LOGGER.info("consul all service num:" + serviceList.size());
154 LOGGER.info("consul filter service num:" + builder.build().size());
156 return builder.build();
159 private Set<String> filterServiceList(final HttpEntity serviceList) {
161 if (serviceList == null || serviceList.getContentLength() == 0) {
162 return new HashSet<String>();
165 final Set<String> builder = new HashSet<String>();
167 JsonFactory f = new JsonFactory();
168 JsonParser jp = null;
169 List<String> tagList = null;
170 int inputServiceNum = 0;
172 jp = f.createParser(serviceList.getContent());
174 while (jp.nextToken() != JsonToken.END_OBJECT) {
175 String serviceName = jp.getCurrentName();
178 tagList = new ArrayList<>();
179 while (jp.nextToken() != JsonToken.END_ARRAY) {
180 tagList.add(jp.getText());
183 if (serviceName != null && !"consul".equals(serviceName)) {
184 if (ServiceFilter.getInstance().isFilterService(tagList)) {
185 builder.add(serviceName);
189 } catch (IOException e) {
190 LOGGER.warn("parse service list error",e);
191 return new HashSet<String>();
195 } catch (IOException e) {
196 LOGGER.warn("parse service list error",e);
197 return new HashSet<String>();
201 int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size();
202 // if(latestServiceNum!=builder.size()){
203 LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+" old——"+latestServiceNum);