Divide the MSB source codes into two repos
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / wrapper / queue / ServiceConsumer.java
1 package org.onap.msb.apiroute.wrapper.queue;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.Set;
7
8 import org.onap.msb.apiroute.SyncDataManager;
9 import org.onap.msb.apiroute.api.MicroServiceFullInfo;
10 import org.onap.msb.apiroute.health.RedisHealthCheck;
11 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
12 import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth;
13 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
14 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18
19 public class ServiceConsumer implements Runnable {
20
21   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);
22
23   private boolean isRunning = true;
24
25   private int index;
26   
27   
28   private static final int retryCount=3;
29
30   //缓存服务信息:key:服务名 和对应的版本列表Set<String>
31   private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>();
32
33   public ServiceConsumer(final int index) {
34     this.index = index;
35   }
36
37
38   public void run() {
39
40     LOGGER.info("run Service Consumer Thread [" + index + "]");
41
42     while (isRunning) {
43       try {
44         ServiceData<List<ServiceHealth>> serviceData;
45
46         serviceData = QueueManager.getInstance().takeFromServiceQueue(index);
47
48         // LOGGER.info("Service Consumer Thread [" + index +
49         // "]  take out serviceData from Queue successfully");
50
51         if (serviceData.getOperate() == ServiceData.Operate.delete) {
52           // 删除服务
53           deleteMicroService(serviceData);
54         } else {
55           // 更新服务
56           updateMicroService(serviceData);
57         }
58       } catch (InterruptedException e) {
59         LOGGER.error("ServiceConsumer throw  InterruptedException: ", e);
60         Thread.currentThread().interrupt();
61       }
62
63     }
64   }
65
66
67
68   private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) {
69     String serviceName = null;
70     try {
71       if (serviceData.getData() == null || serviceData.getData().size() == 0) {
72         throw new Exception("sysn deleteMicroService is wrong:serviceData is empty");
73       }
74
75       serviceName = serviceData.getData().get(0).getService().getService();
76 //      LOGGER.info("Service Consumer [" + index + "] start to delete MicroService:[serviceName] "
77 //          + serviceName);
78
79       //ServiceListCache.removeService(serviceName);
80       MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
81
82     } catch (Exception e) {
83       LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e);
84       //删除失败,重试三次
85       for(int i=0;i<retryCount;i++){
86        
87         try {
88           Thread.sleep(1000);
89         } catch (InterruptedException ex) {
90           LOGGER.error("delete MicroServiceInfo 4AllVersion  Thread.sleep throw except:" + ex.getMessage());
91         }
92         if(reDeleteMicroService(serviceName)){
93           LOGGER.info((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo success [serviceName]" + serviceName);
94           break;  
95         }
96         else{
97           LOGGER.error((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo  still fail [serviceName]" + serviceName); 
98         }
99       }
100     }
101   }
102   
103   private boolean reDeleteMicroService(String serviceName){
104     try {
105       MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
106       return true;
107     } catch (Exception e) {
108       return false;
109     }
110   }
111
112   private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) {
113
114     if (serviceData.getData() == null || serviceData.getData().size() == 0) {
115       LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty ");
116       return;
117     }
118
119     String serviceName = "";
120
121     try {
122
123       serviceName = serviceData.getData().get(0).getService().getService();
124       List<ServiceHealth> serviceNodeList = serviceData.getData();
125
126
127       Map<String, MicroServiceFullInfo> microServiceInfo4version =
128           ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList);
129
130       // 删除数据库中已不存在的版本号服务信息
131       Set<String> newAllVersion = microServiceInfo4version.keySet();
132
133       if (lastVersionResponse.containsKey(serviceName)) {
134         Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName);
135         // Set<String> dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName);
136         Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet);
137
138         if (delVersionList.size() > 0) {
139
140           LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName
141               + "[version]" + delVersionList);
142
143
144           for (String version : delVersionList) {
145             MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);
146           }
147          
148         }
149       }
150    
151         lastVersionResponse.put(serviceName, newAllVersion);
152
153       for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) {
154         MicroServiceFullInfo new_microServiceFullInfo = entry.getValue();
155         MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo);
156        
157       }
158
159
160     } catch (Exception e) {
161       LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName);
162       //更新失败,重置任务服务的modifyIndex,等待重新更新
163       RedisHealthCheck.writeCheckFlag = true;
164       SyncDataManager.resetIndex(serviceName);
165     }
166   }
167 }