Divide the MSB source codes into two repos
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / SyncDataManager.java
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java
new file mode 100644 (file)
index 0000000..865778e
--- /dev/null
@@ -0,0 +1,135 @@
+package org.onap.msb.apiroute;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.http.HttpEntity;
+import org.onap.msb.apiroute.wrapper.consulextend.Consul;
+import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckServiceDataEmptyAndAutoStopWatchFilter;
+import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckTagAndAutoStopWatchFilter;
+import org.onap.msb.apiroute.wrapper.consulextend.expose.ServiceModifyIndexFilter;
+import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchCatalogServicesTask;
+import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchServiceHealthTask;
+import org.onap.msb.apiroute.wrapper.consulextend.expose.WriteBufferHandler;
+import org.onap.msb.apiroute.wrapper.queue.ServiceConsumer;
+import org.onap.msb.apiroute.wrapper.queue.ServiceData;
+import org.onap.msb.apiroute.wrapper.queue.ServiceListConsumer;
+import org.onap.msb.apiroute.wrapper.util.RouteUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncDataManager {
+       private static Consul consul;
+       private static WatchCatalogServicesTask serviceListWatchTask;
+       private final static Map<String, WatchServiceHealthTask> serviceWatchTaskMap = new ConcurrentHashMap<String, WatchServiceHealthTask>();
+
+       private static final Logger LOGGER = LoggerFactory
+                       .getLogger(SyncDataManager.class);
+
+       private SyncDataManager() {
+       }
+
+       public static void initSyncTask(final String ip, final int port) {
+               consul = Consul.builder().withHostAndPort(ip, port).build();
+               startWatchServiceList();
+               startQueueConsumer();
+       }
+
+       public static void startWatchServiceList() {
+
+               LOGGER.info("===========start to WatchServiceList============");
+
+               // create service list watch task
+               serviceListWatchTask = new WatchCatalogServicesTask(
+                               consul.catalogClient(), RouteUtil.WATCH_SECOND);
+
+               // first,write data to serviceListQueue buffer.
+               // second,async thread will read data from serviceListQueue buffer.
+               serviceListWatchTask.addHandler(new WriteBufferHandler<HttpEntity>(
+                               ServiceData.DataType.service_list));
+
+               // start watch
+               serviceListWatchTask.startWatch();
+       }
+
+       public static void startQueueConsumer() {
+               LOGGER.info("===========start to QueueConsumer Thread============");
+
+               // start ServiceListConsumer
+               new Thread(new ServiceListConsumer(), "ServiceListConsumerThread")
+                               .start();
+
+               // start Service Consumer
+               int serviceQueneNum = RouteUtil.SERVICE_DATA_QUEUE_NUM;
+               for (int i = 0; i < serviceQueneNum; i++) {
+                       new Thread(new ServiceConsumer(i), "ServiceConsumerThread" + i)
+                                       .start();
+               }
+
+       }
+
+       public static void startWatchService(final String serviceName) {
+
+               LOGGER.info("===========start to Watch Service[" + serviceName
+                               + "]============");
+               // create service watch task
+               WatchServiceHealthTask serviceWatchTask = new WatchServiceHealthTask(
+                               consul.healthClient(), serviceName, RouteUtil.WATCH_SECOND);
+
+               // 1.service Data Empty filter
+               serviceWatchTask
+                               .addFilter(new CheckServiceDataEmptyAndAutoStopWatchFilter(
+                                               serviceName));
+
+               // 2.service change filter
+               serviceWatchTask.addFilter(new ServiceModifyIndexFilter());
+
+               // 3.apigateway tag filter:check tag and auto stop watch
+               serviceWatchTask.addFilter(new CheckTagAndAutoStopWatchFilter(
+                               serviceName));
+
+               // start watch
+               serviceWatchTask.startWatch();
+
+               // save
+               serviceWatchTaskMap.put(serviceName, serviceWatchTask);
+       }
+
+       public static void stopWatchServiceList() {
+               if (serviceListWatchTask != null) {
+                       serviceListWatchTask.removeAllFilter();
+                       serviceListWatchTask.removeAllHandler();
+                       serviceListWatchTask.stopWatch();
+               }
+       }
+
+       public static void stopWatchService(String serviceName) {
+               if (LOGGER.isDebugEnabled()) {
+                       LOGGER.debug("stop " + serviceName + " service watch!");
+               }
+
+               WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName);
+               if (watchTask != null) {
+                       watchTask.removeAllFilter();
+                       watchTask.removeAllHandler();
+                       watchTask.stopWatch();
+               }
+               serviceWatchTaskMap.remove(serviceName);
+       }
+
+       public static boolean resetIndex(String serviceName) {
+
+               WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName);
+
+               if (watchTask != null) {
+                       return watchTask.resetIndex();
+               }
+
+               if (LOGGER.isDebugEnabled()) {
+                       LOGGER.debug("reset modify index.did not find:" + serviceName);
+               }
+
+               return false;
+       }
+
+}