1 package org.onap.msb.apiroute.wrapper.queue;
3 import java.util.HashMap;
8 import org.onap.msb.apiroute.SyncDataManager;
9 import org.onap.msb.apiroute.api.MicroServiceFullInfo;
10 import org.onap.msb.apiroute.health.RedisHealthCheck;
11 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
12 import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth;
13 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
14 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
19 public class ServiceConsumer implements Runnable {
21 private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);
23 private boolean isRunning = true;
28 private static final int retryCount=3;
30 //缓存服务信息:key:服务名 和对应的版本列表Set<String>
31 private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>();
33 public ServiceConsumer(final int index) {
40 LOGGER.info("run Service Consumer Thread [" + index + "]");
44 ServiceData<List<ServiceHealth>> serviceData;
46 serviceData = QueueManager.getInstance().takeFromServiceQueue(index);
48 // LOGGER.info("Service Consumer Thread [" + index +
49 // "] take out serviceData from Queue successfully");
51 if (serviceData.getOperate() == ServiceData.Operate.delete) {
53 deleteMicroService(serviceData);
56 updateMicroService(serviceData);
58 } catch (InterruptedException e) {
59 LOGGER.error("ServiceConsumer throw InterruptedException: ", e);
60 Thread.currentThread().interrupt();
68 private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) {
69 String serviceName = null;
71 if (serviceData.getData() == null || serviceData.getData().size() == 0) {
72 throw new Exception("sysn deleteMicroService is wrong:serviceData is empty");
75 serviceName = serviceData.getData().get(0).getService().getService();
76 // LOGGER.info("Service Consumer [" + index + "] start to delete MicroService:[serviceName] "
79 //ServiceListCache.removeService(serviceName);
80 MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
82 } catch (Exception e) {
83 LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e);
85 for(int i=0;i<retryCount;i++){
89 } catch (InterruptedException ex) {
90 LOGGER.error("delete MicroServiceInfo 4AllVersion Thread.sleep throw except:" + ex.getMessage());
92 if(reDeleteMicroService(serviceName)){
93 LOGGER.info((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo success [serviceName]" + serviceName);
97 LOGGER.error((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo still fail [serviceName]" + serviceName);
103 private boolean reDeleteMicroService(String serviceName){
105 MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
107 } catch (Exception e) {
112 private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) {
114 if (serviceData.getData() == null || serviceData.getData().size() == 0) {
115 LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty ");
119 String serviceName = "";
123 serviceName = serviceData.getData().get(0).getService().getService();
124 List<ServiceHealth> serviceNodeList = serviceData.getData();
127 Map<String, MicroServiceFullInfo> microServiceInfo4version =
128 ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList);
130 // 删除数据库中已不存在的版本号服务信息
131 Set<String> newAllVersion = microServiceInfo4version.keySet();
133 if (lastVersionResponse.containsKey(serviceName)) {
134 Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName);
135 // Set<String> dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName);
136 Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet);
138 if (delVersionList.size() > 0) {
140 LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName
141 + "[version]" + delVersionList);
144 for (String version : delVersionList) {
145 MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);
151 lastVersionResponse.put(serviceName, newAllVersion);
153 for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) {
154 MicroServiceFullInfo new_microServiceFullInfo = entry.getValue();
155 MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo);
160 } catch (Exception e) {
161 LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName);
162 //更新失败,重置任务服务的modifyIndex,等待重新更新
163 RedisHealthCheck.writeCheckFlag = true;
164 SyncDataManager.resetIndex(serviceName);