2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service;
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
30 import javax.annotation.PostConstruct;
32 import org.apache.commons.collections.CollectionUtils;
33 import org.onap.datalake.feeder.config.ApplicationConfiguration;
34 import org.onap.datalake.feeder.domain.EffectiveTopic;
35 import org.onap.datalake.feeder.domain.Kafka;
36 import org.onap.datalake.feeder.repository.KafkaRepository;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Autowired;
40 import org.springframework.context.ApplicationContext;
41 import org.springframework.stereotype.Service;
44 * Service to check topic changes in Kafka and topic setting updates in DB
50 public class TopicConfigPollingService implements Runnable {
52 private final Logger log = LoggerFactory.getLogger(this.getClass());
55 ApplicationConfiguration config;
58 private ApplicationContext context;
61 private KafkaRepository kafkaRepository;
63 //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
64 private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();;
65 //private Map<String, TopicConfig> effectiveTopicConfigMap;
67 //monitor Kafka topic list changes
68 private Map<String, Set<String>> activeTopicMap;
70 private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = new ThreadLocal<>();
71 private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();
73 private boolean active = false;
78 log.info("init(), calling poll()...");
79 activeTopicMap = poll();
80 } catch (Exception ex) {
81 log.error("error connection to HDFS.", ex);
85 public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
86 String kafkaId = kafka.getId();
87 int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
88 int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
90 boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion;
91 log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion);
93 activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion);
99 //get a list of topic names to monitor
100 public Collection<String> getActiveTopics(Kafka kafka) {
101 return activeTopicMap.get(kafka.getId());
104 //get the EffectiveTopics given kafka and topic name
105 public Collection<EffectiveTopic> getEffectiveTopic(Kafka kafka, String topicStr) {
106 Map<String, List<EffectiveTopic>> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId());
107 return effectiveTopicMapKafka.get(topicStr);
113 log.info("TopicConfigPollingService started.");
116 try { //sleep first since we already pool in init()
117 Thread.sleep(config.getCheckTopicInterval());
121 } catch (InterruptedException e) {
122 log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
123 Thread.currentThread().interrupt();
127 Map<String, Set<String>> newTopicsMap = poll();
129 for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) {
130 String kafkaId = entry.getKey();
131 Set<String> newTopics = entry.getValue();
133 Set<String> activeTopics = activeTopicMap.get(kafkaId);
135 if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
136 log.info("activeTopics list is updated, old={}", activeTopics);
137 log.info("activeTopics list is updated, new={}", newTopics);
139 activeTopicMap.put(kafkaId, newTopics);
140 currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1);
142 log.debug("activeTopics list is not updated.");
145 } catch (IOException e) {
146 log.error("dmaapService.getActiveTopics()", e);
150 log.info("exit since active is set to false");
153 public void shutdown() {
157 private Map<String, Set<String>> poll() throws IOException {
158 Map<String, Set<String>> ret = new HashMap<>();
159 Iterable<Kafka> kafkas = kafkaRepository.findAll();
160 for (Kafka kafka : kafkas) {
161 if (kafka.isEnabled()) {
162 Set<String> topics = poll(kafka);
163 ret.put(kafka.getId(), topics);
169 private Set<String> poll(Kafka kafka) throws IOException {
170 log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
172 DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
174 Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
175 effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
177 Set<String> ret = activeEffectiveTopics.keySet();