Divide the MSB source codes into two repos
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / wrapper / queue / ServiceConsumer.java
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java
new file mode 100644 (file)
index 0000000..29d705f
--- /dev/null
@@ -0,0 +1,167 @@
+package org.onap.msb.apiroute.wrapper.queue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.onap.msb.apiroute.SyncDataManager;
+import org.onap.msb.apiroute.api.MicroServiceFullInfo;
+import org.onap.msb.apiroute.health.RedisHealthCheck;
+import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth;
+import org.onap.msb.apiroute.wrapper.util.CommonUtil;
+import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ServiceConsumer implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);
+
+  private boolean isRunning = true;
+
+  private int index;
+  
+  
+  private static final int retryCount=3;
+
+  //缓存服务信息:key:服务名 和对应的版本列表Set<String>
+  private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>();
+
+  public ServiceConsumer(final int index) {
+    this.index = index;
+  }
+
+
+  public void run() {
+
+    LOGGER.info("run Service Consumer Thread [" + index + "]");
+
+    while (isRunning) {
+      try {
+        ServiceData<List<ServiceHealth>> serviceData;
+
+        serviceData = QueueManager.getInstance().takeFromServiceQueue(index);
+
+        // LOGGER.info("Service Consumer Thread [" + index +
+        // "]  take out serviceData from Queue successfully");
+
+        if (serviceData.getOperate() == ServiceData.Operate.delete) {
+          // 删除服务
+          deleteMicroService(serviceData);
+        } else {
+          // 更新服务
+          updateMicroService(serviceData);
+        }
+      } catch (InterruptedException e) {
+        LOGGER.error("ServiceConsumer throw  InterruptedException: ", e);
+        Thread.currentThread().interrupt();
+      }
+
+    }
+  }
+
+
+
+  private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) {
+    String serviceName = null;
+    try {
+      if (serviceData.getData() == null || serviceData.getData().size() == 0) {
+        throw new Exception("sysn deleteMicroService is wrong:serviceData is empty");
+      }
+
+      serviceName = serviceData.getData().get(0).getService().getService();
+//      LOGGER.info("Service Consumer [" + index + "] start to delete MicroService:[serviceName] "
+//          + serviceName);
+
+      //ServiceListCache.removeService(serviceName);
+      MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
+
+    } catch (Exception e) {
+      LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e);
+      //删除失败,重试三次
+      for(int i=0;i<retryCount;i++){
+       
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          LOGGER.error("delete MicroServiceInfo 4AllVersion  Thread.sleep throw except:" + ex.getMessage());
+        }
+        if(reDeleteMicroService(serviceName)){
+          LOGGER.info((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo success [serviceName]" + serviceName);
+          break;  
+        }
+        else{
+          LOGGER.error((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo  still fail [serviceName]" + serviceName); 
+        }
+      }
+    }
+  }
+  
+  private boolean reDeleteMicroService(String serviceName){
+    try {
+      MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) {
+
+    if (serviceData.getData() == null || serviceData.getData().size() == 0) {
+      LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty ");
+      return;
+    }
+
+    String serviceName = "";
+
+    try {
+
+      serviceName = serviceData.getData().get(0).getService().getService();
+      List<ServiceHealth> serviceNodeList = serviceData.getData();
+
+
+      Map<String, MicroServiceFullInfo> microServiceInfo4version =
+          ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList);
+
+      // 删除数据库中已不存在的版本号服务信息
+      Set<String> newAllVersion = microServiceInfo4version.keySet();
+
+      if (lastVersionResponse.containsKey(serviceName)) {
+        Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName);
+        // Set<String> dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName);
+        Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet);
+
+        if (delVersionList.size() > 0) {
+
+          LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName
+              + "[version]" + delVersionList);
+
+
+          for (String version : delVersionList) {
+            MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version);
+          }
+         
+        }
+      }
+   
+        lastVersionResponse.put(serviceName, newAllVersion);
+
+      for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) {
+        MicroServiceFullInfo new_microServiceFullInfo = entry.getValue();
+        MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo);
+       
+      }
+
+
+    } catch (Exception e) {
+      LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName);
+      //更新失败,重置任务服务的modifyIndex,等待重新更新
+      RedisHealthCheck.writeCheckFlag = true;
+      SyncDataManager.resetIndex(serviceName);
+    }
+  }
+}