1 package org.onap.msb.apiroute;
4 import java.util.concurrent.ConcurrentHashMap;
6 import org.apache.http.HttpEntity;
7 import org.onap.msb.apiroute.wrapper.consulextend.Consul;
8 import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckServiceDataEmptyAndAutoStopWatchFilter;
9 import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckTagAndAutoStopWatchFilter;
10 import org.onap.msb.apiroute.wrapper.consulextend.expose.ServiceModifyIndexFilter;
11 import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchCatalogServicesTask;
12 import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchServiceHealthTask;
13 import org.onap.msb.apiroute.wrapper.consulextend.expose.WriteBufferHandler;
14 import org.onap.msb.apiroute.wrapper.queue.ServiceConsumer;
15 import org.onap.msb.apiroute.wrapper.queue.ServiceData;
16 import org.onap.msb.apiroute.wrapper.queue.ServiceListConsumer;
17 import org.onap.msb.apiroute.wrapper.util.RouteUtil;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 public class SyncDataManager {
22 private static Consul consul;
23 private static WatchCatalogServicesTask serviceListWatchTask;
24 private final static Map<String, WatchServiceHealthTask> serviceWatchTaskMap = new ConcurrentHashMap<String, WatchServiceHealthTask>();
26 private static final Logger LOGGER = LoggerFactory
27 .getLogger(SyncDataManager.class);
29 private SyncDataManager() {
32 public static void initSyncTask(final String ip, final int port) {
33 consul = Consul.builder().withHostAndPort(ip, port).build();
34 startWatchServiceList();
38 public static void startWatchServiceList() {
40 LOGGER.info("===========start to WatchServiceList============");
42 // create service list watch task
43 serviceListWatchTask = new WatchCatalogServicesTask(
44 consul.catalogClient(), RouteUtil.WATCH_SECOND);
46 // first,write data to serviceListQueue buffer.
47 // second,async thread will read data from serviceListQueue buffer.
48 serviceListWatchTask.addHandler(new WriteBufferHandler<HttpEntity>(
49 ServiceData.DataType.service_list));
52 serviceListWatchTask.startWatch();
55 public static void startQueueConsumer() {
56 LOGGER.info("===========start to QueueConsumer Thread============");
58 // start ServiceListConsumer
59 new Thread(new ServiceListConsumer(), "ServiceListConsumerThread")
62 // start Service Consumer
63 int serviceQueneNum = RouteUtil.SERVICE_DATA_QUEUE_NUM;
64 for (int i = 0; i < serviceQueneNum; i++) {
65 new Thread(new ServiceConsumer(i), "ServiceConsumerThread" + i)
71 public static void startWatchService(final String serviceName) {
73 LOGGER.info("===========start to Watch Service[" + serviceName
75 // create service watch task
76 WatchServiceHealthTask serviceWatchTask = new WatchServiceHealthTask(
77 consul.healthClient(), serviceName, RouteUtil.WATCH_SECOND);
79 // 1.service Data Empty filter
81 .addFilter(new CheckServiceDataEmptyAndAutoStopWatchFilter(
84 // 2.service change filter
85 serviceWatchTask.addFilter(new ServiceModifyIndexFilter());
87 // 3.apigateway tag filter:check tag and auto stop watch
88 serviceWatchTask.addFilter(new CheckTagAndAutoStopWatchFilter(
92 serviceWatchTask.startWatch();
95 serviceWatchTaskMap.put(serviceName, serviceWatchTask);
98 public static void stopWatchServiceList() {
99 if (serviceListWatchTask != null) {
100 serviceListWatchTask.removeAllFilter();
101 serviceListWatchTask.removeAllHandler();
102 serviceListWatchTask.stopWatch();
106 public static void stopWatchService(String serviceName) {
107 if (LOGGER.isDebugEnabled()) {
108 LOGGER.debug("stop " + serviceName + " service watch!");
111 WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName);
112 if (watchTask != null) {
113 watchTask.removeAllFilter();
114 watchTask.removeAllHandler();
115 watchTask.stopWatch();
117 serviceWatchTaskMap.remove(serviceName);
120 public static boolean resetIndex(String serviceName) {
122 WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName);
124 if (watchTask != null) {
125 return watchTask.resetIndex();
128 if (LOGGER.isDebugEnabled()) {
129 LOGGER.debug("reset modify index.did not find:" + serviceName);