a02cd6a25e5cffc150d34c0957ae3ef74bf79662
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service;
22
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29
30 import javax.annotation.PostConstruct;
31
32 import org.apache.commons.collections.CollectionUtils;
33 import org.onap.datalake.feeder.config.ApplicationConfiguration;
34 import org.onap.datalake.feeder.domain.EffectiveTopic;
35 import org.onap.datalake.feeder.domain.Kafka;
36 import org.onap.datalake.feeder.repository.KafkaRepository;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Autowired;
40 import org.springframework.context.ApplicationContext;
41 import org.springframework.stereotype.Service;
42
43 /**
44  * Service to check topic changes in Kafka and topic setting updates in DB
45  * 
46  * @author Guobiao Mo
47  *
48  */
49 @Service
50 public class TopicConfigPollingService implements Runnable {
51
52         private final Logger log = LoggerFactory.getLogger(this.getClass());
53
54         @Autowired
55         ApplicationConfiguration config;
56
57         @Autowired
58         private ApplicationContext context;
59
60         @Autowired
61         private KafkaRepository kafkaRepository;
62         
63         //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
64         private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
65         //private Map<String, TopicConfig> effectiveTopicConfigMap;
66
67         //monitor Kafka topic list changes, key is kafka id, value is active Topics
68         private Map<Integer, Set<String>> activeTopicMap;
69         
70         private ThreadLocal<Map<Integer, Integer>> activeTopicsVersionLocal =   ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version
71         private Map<Integer, Integer> currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version
72         private Map<Integer, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
73
74         private boolean active = false;
75
76         @PostConstruct
77         private void init() {
78                 try {
79                         log.info("init(), calling poll()...");
80                         activeTopicMap = poll();
81                 } catch (Exception ex) {
82                         log.error("error connection to HDFS.", ex);
83                 }
84         }
85
86         public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
87                 int kafkaId = kafka.getId();
88                 int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
89                 int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
90                 
91                 boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion;
92                 log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion);
93                 if (changed) {
94                         activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion);
95                 }
96
97                 return changed;
98         }
99
100         //get a list of topic names to monitor
101         public Collection<String> getActiveTopics(Kafka kafka) {
102                 return activeTopicMap.get(kafka.getId());
103         }
104
105         //get the EffectiveTopics given kafka and topic name
106         public Collection<EffectiveTopic> getEffectiveTopic(Kafka kafka, String topicStr) {
107                 Map<String, List<EffectiveTopic>> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId());  
108                 return effectiveTopicMapKafka.get(topicStr);
109         }
110
111         @Override
112         public void run() {
113                 active = true;
114                 log.info("TopicConfigPollingService started.");
115
116                 while (active) {
117                         try { //sleep first since we already pool in init()
118                                 Thread.sleep(config.getCheckTopicInterval());
119                                 if(!active) {
120                                         break;
121                                 }
122                         } catch (InterruptedException e) {
123                                 log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
124                                 Thread.currentThread().interrupt();
125                         }
126
127                         try {
128                                 Map<Integer, Set<String>> newTopicsMap = poll();
129                                 
130                                 for(Map.Entry<Integer, Set<String>> entry:newTopicsMap.entrySet()) {
131                                         Integer kafkaId = entry.getKey();
132                                         Set<String>  newTopics = entry.getValue();
133                                         
134                                         Set<String> activeTopics = activeTopicMap.get(kafkaId);
135
136                                         if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
137                                                 log.info("activeTopics list is updated, old={}", activeTopics);
138                                                 log.info("activeTopics list is updated, new={}", newTopics);
139
140                                                 activeTopicMap.put(kafkaId, newTopics);
141                                                 currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1);
142                                         } else {
143                                                 log.debug("activeTopics list is not updated.");
144                                         }
145                                 }
146                         } catch (IOException e) {
147                                 log.error("dmaapService.getActiveTopics()", e);
148                         }
149                 }
150
151                 log.info("exit since active is set to false");
152         }
153
154         public void shutdown() {
155                 active = false;
156         }
157
158         private Map<Integer, Set<String>>  poll() throws IOException {
159                 Map<Integer, Set<String>> ret = new HashMap<>();
160                 Iterable<Kafka> kafkas = kafkaRepository.findAll();
161                 for (Kafka kafka : kafkas) {
162                         if (kafka.isEnabled()) {
163                                 Set<String> topics = poll(kafka);
164                                 ret.put(kafka.getId(), topics);
165                         }
166                 }
167                 return ret;
168         }
169
170         private Set<String> poll(Kafka kafka) throws IOException {
171                 log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
172
173                 DmaapService dmaapService =  dmaapServiceMap.get(kafka.getId());
174                 if(dmaapService==null) {
175                         dmaapService = context.getBean(DmaapService.class, kafka);
176                         dmaapServiceMap.put(kafka.getId(), dmaapService);
177                 }
178                                 
179                 Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
180                 effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
181
182                 Set<String> ret = activeEffectiveTopics.keySet(); 
183
184                 return ret;
185         }
186
187 }