Divide the MSB source codes into two repos
[msb/apigateway.git] / apiroute / apiroute-service / src / main / java / org / onap / msb / apiroute / SyncDataManager.java
1 package org.onap.msb.apiroute;
2
3 import java.util.Map;
4 import java.util.concurrent.ConcurrentHashMap;
5
6 import org.apache.http.HttpEntity;
7 import org.onap.msb.apiroute.wrapper.consulextend.Consul;
8 import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckServiceDataEmptyAndAutoStopWatchFilter;
9 import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckTagAndAutoStopWatchFilter;
10 import org.onap.msb.apiroute.wrapper.consulextend.expose.ServiceModifyIndexFilter;
11 import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchCatalogServicesTask;
12 import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchServiceHealthTask;
13 import org.onap.msb.apiroute.wrapper.consulextend.expose.WriteBufferHandler;
14 import org.onap.msb.apiroute.wrapper.queue.ServiceConsumer;
15 import org.onap.msb.apiroute.wrapper.queue.ServiceData;
16 import org.onap.msb.apiroute.wrapper.queue.ServiceListConsumer;
17 import org.onap.msb.apiroute.wrapper.util.RouteUtil;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 public class SyncDataManager {
22         private static Consul consul;
23         private static WatchCatalogServicesTask serviceListWatchTask;
24         private final static Map<String, WatchServiceHealthTask> serviceWatchTaskMap = new ConcurrentHashMap<String, WatchServiceHealthTask>();
25
26         private static final Logger LOGGER = LoggerFactory
27                         .getLogger(SyncDataManager.class);
28
29         private SyncDataManager() {
30         }
31
32         public static void initSyncTask(final String ip, final int port) {
33                 consul = Consul.builder().withHostAndPort(ip, port).build();
34                 startWatchServiceList();
35                 startQueueConsumer();
36         }
37
38         public static void startWatchServiceList() {
39
40                 LOGGER.info("===========start to WatchServiceList============");
41
42                 // create service list watch task
43                 serviceListWatchTask = new WatchCatalogServicesTask(
44                                 consul.catalogClient(), RouteUtil.WATCH_SECOND);
45
46                 // first,write data to serviceListQueue buffer.
47                 // second,async thread will read data from serviceListQueue buffer.
48                 serviceListWatchTask.addHandler(new WriteBufferHandler<HttpEntity>(
49                                 ServiceData.DataType.service_list));
50
51                 // start watch
52                 serviceListWatchTask.startWatch();
53         }
54
55         public static void startQueueConsumer() {
56                 LOGGER.info("===========start to QueueConsumer Thread============");
57
58                 // start ServiceListConsumer
59                 new Thread(new ServiceListConsumer(), "ServiceListConsumerThread")
60                                 .start();
61
62                 // start Service Consumer
63                 int serviceQueneNum = RouteUtil.SERVICE_DATA_QUEUE_NUM;
64                 for (int i = 0; i < serviceQueneNum; i++) {
65                         new Thread(new ServiceConsumer(i), "ServiceConsumerThread" + i)
66                                         .start();
67                 }
68
69         }
70
71         public static void startWatchService(final String serviceName) {
72
73                 LOGGER.info("===========start to Watch Service[" + serviceName
74                                 + "]============");
75                 // create service watch task
76                 WatchServiceHealthTask serviceWatchTask = new WatchServiceHealthTask(
77                                 consul.healthClient(), serviceName, RouteUtil.WATCH_SECOND);
78
79                 // 1.service Data Empty filter
80                 serviceWatchTask
81                                 .addFilter(new CheckServiceDataEmptyAndAutoStopWatchFilter(
82                                                 serviceName));
83
84                 // 2.service change filter
85                 serviceWatchTask.addFilter(new ServiceModifyIndexFilter());
86
87                 // 3.apigateway tag filter:check tag and auto stop watch
88                 serviceWatchTask.addFilter(new CheckTagAndAutoStopWatchFilter(
89                                 serviceName));
90
91                 // start watch
92                 serviceWatchTask.startWatch();
93
94                 // save
95                 serviceWatchTaskMap.put(serviceName, serviceWatchTask);
96         }
97
98         public static void stopWatchServiceList() {
99                 if (serviceListWatchTask != null) {
100                         serviceListWatchTask.removeAllFilter();
101                         serviceListWatchTask.removeAllHandler();
102                         serviceListWatchTask.stopWatch();
103                 }
104         }
105
106         public static void stopWatchService(String serviceName) {
107                 if (LOGGER.isDebugEnabled()) {
108                         LOGGER.debug("stop " + serviceName + " service watch!");
109                 }
110
111                 WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName);
112                 if (watchTask != null) {
113                         watchTask.removeAllFilter();
114                         watchTask.removeAllHandler();
115                         watchTask.stopWatch();
116                 }
117                 serviceWatchTaskMap.remove(serviceName);
118         }
119
120         public static boolean resetIndex(String serviceName) {
121
122                 WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName);
123
124                 if (watchTask != null) {
125                         return watchTask.resetIndex();
126                 }
127
128                 if (LOGGER.isDebugEnabled()) {
129                         LOGGER.debug("reset modify index.did not find:" + serviceName);
130                 }
131
132                 return false;
133         }
134
135 }