2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service;
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
30 import javax.annotation.PostConstruct;
32 import org.apache.commons.collections.CollectionUtils;
33 import org.onap.datalake.feeder.config.ApplicationConfiguration;
34 import org.onap.datalake.feeder.domain.EffectiveTopic;
35 import org.onap.datalake.feeder.domain.Kafka;
36 import org.onap.datalake.feeder.repository.KafkaRepository;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Autowired;
40 import org.springframework.context.ApplicationContext;
41 import org.springframework.stereotype.Service;
44 * Service to check topic changes in Kafka and topic setting updates in DB
50 public class TopicConfigPollingService implements Runnable {
52 private final Logger log = LoggerFactory.getLogger(this.getClass());
55 ApplicationConfiguration config;
58 private ApplicationContext context;
61 private KafkaRepository kafkaRepository;
63 //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
64 private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
65 //private Map<String, TopicConfig> effectiveTopicConfigMap;
67 //monitor Kafka topic list changes
68 private Map<String, Set<String>> activeTopicMap;
70 private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//topic name:version
71 private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
72 private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
74 private boolean active = false;
79 log.info("init(), calling poll()...");
80 activeTopicMap = poll();
81 } catch (Exception ex) {
82 log.error("error connection to HDFS.", ex);
86 public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
87 String kafkaId = kafka.getId();
88 int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
89 int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
91 boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion;
92 log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion);
94 activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion);
100 //get a list of topic names to monitor
101 public Collection<String> getActiveTopics(Kafka kafka) {
102 return activeTopicMap.get(kafka.getId());
105 //get the EffectiveTopics given kafka and topic name
106 public Collection<EffectiveTopic> getEffectiveTopic(Kafka kafka, String topicStr) {
107 Map<String, List<EffectiveTopic>> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId());
108 return effectiveTopicMapKafka.get(topicStr);
114 log.info("TopicConfigPollingService started.");
117 try { //sleep first since we already pool in init()
118 Thread.sleep(config.getCheckTopicInterval());
122 } catch (InterruptedException e) {
123 log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
124 Thread.currentThread().interrupt();
128 Map<String, Set<String>> newTopicsMap = poll();
130 for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) {
131 String kafkaId = entry.getKey();
132 Set<String> newTopics = entry.getValue();
134 Set<String> activeTopics = activeTopicMap.get(kafkaId);
136 if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
137 log.info("activeTopics list is updated, old={}", activeTopics);
138 log.info("activeTopics list is updated, new={}", newTopics);
140 activeTopicMap.put(kafkaId, newTopics);
141 currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1);
143 log.debug("activeTopics list is not updated.");
146 } catch (IOException e) {
147 log.error("dmaapService.getActiveTopics()", e);
151 log.info("exit since active is set to false");
154 public void shutdown() {
158 private Map<String, Set<String>> poll() throws IOException {
159 Map<String, Set<String>> ret = new HashMap<>();
160 Iterable<Kafka> kafkas = kafkaRepository.findAll();
161 for (Kafka kafka : kafkas) {
162 if (kafka.isEnabled()) {
163 Set<String> topics = poll(kafka);
164 ret.put(kafka.getId(), topics);
170 private Set<String> poll(Kafka kafka) throws IOException {
171 log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
173 DmaapService dmaapService = dmaapServiceMap.get(kafka.getId());
174 if(dmaapService==null) {
175 dmaapService = context.getBean(DmaapService.class, kafka);
176 dmaapServiceMap.put(kafka.getId(), dmaapService);
179 Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
180 effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
182 Set<String> ret = activeEffectiveTopics.keySet();