2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Optional;
30 import org.apache.commons.collections.CollectionUtils;
31 import org.onap.datalake.feeder.config.ApplicationConfiguration;
32 import org.onap.datalake.feeder.dto.TopicConfig;
33 import org.onap.datalake.feeder.domain.Db;
34 import org.onap.datalake.feeder.domain.EffectiveTopic;
35 import org.onap.datalake.feeder.domain.Kafka;
36 import org.onap.datalake.feeder.domain.Topic;
37 import org.onap.datalake.feeder.repository.DbRepository;
38 import org.onap.datalake.feeder.repository.KafkaRepository;
39 import org.onap.datalake.feeder.repository.TopicNameRepository;
40 import org.onap.datalake.feeder.repository.TopicRepository;
41 import org.onap.datalake.feeder.service.db.ElasticsearchService;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.stereotype.Service;
54 public class TopicService {
56 private final Logger log = LoggerFactory.getLogger(this.getClass());
59 private ApplicationConfiguration config;
62 private TopicNameRepository topicNameRepository;
65 private TopicRepository topicRepository;
68 private DbRepository dbRepository;
71 private DbService dbService;
74 private KafkaRepository kafkaRepository;
76 public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
78 List<Topic> topics = findTopics(kafka, topicStr);
79 if (CollectionUtils.isEmpty(topics)) {
80 topics = new ArrayList<>();
81 topics.add(getDefaultTopic(kafka));
84 List<EffectiveTopic> ret = new ArrayList<>();
85 for (Topic topic : topics) {
86 if (!topic.isEnabled()) {
89 ret.add(new EffectiveTopic(topic, topicStr));
91 if (ensureTableExist) {
92 for (Db db : topic.getDbs()) {
93 if (db.isElasticsearch()) {
94 ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db);
95 elasticsearchService.ensureTableExist(topicStr);
105 public List<Topic> findTopics(Kafka kafka, String topicStr) {
106 List<Topic> ret = new ArrayList<>();
108 Iterable<Topic> allTopics = topicRepository.findAll();
109 for(Topic topic: allTopics) {
110 if(topic.getKafkas().contains(kafka ) && topic.getTopicName().getId().equals(topicStr)){
117 public Topic getTopic(int topicId) {
118 Optional<Topic> ret = topicRepository.findById(topicId);
119 return ret.isPresent() ? ret.get() : null;
122 public Topic getDefaultTopic(Kafka kafka) {
123 return findTopics(kafka, config.getDefaultTopicName()).get(0);
126 public boolean isDefaultTopic(Topic topic) {
130 return topic.getName().equals(config.getDefaultTopicName());
133 public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) {
134 fillTopic(tConfig, wTopic);
137 public Topic fillTopicConfiguration(TopicConfig tConfig) {
138 Topic topic = new Topic();
139 fillTopic(tConfig, topic);
143 private void fillTopic(TopicConfig tConfig, Topic topic) {
144 Set<Db> relateDb = new HashSet<>();
145 topic.setId(tConfig.getId());
146 topic.setTopicName(topicNameRepository.findById(tConfig.getName()).get());
147 topic.setLogin(tConfig.getLogin());
148 topic.setPass(tConfig.getPassword());
149 topic.setEnabled(tConfig.isEnabled());
150 topic.setSaveRaw(tConfig.isSaveRaw());
151 topic.setTtl(tConfig.getTtl());
152 topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
153 topic.setDataFormat(tConfig.getDataFormat());
154 topic.setMessageIdPath(tConfig.getMessageIdPath());
155 topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
156 topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
158 if (tConfig.getSinkdbs() != null) {
159 for (String item : tConfig.getSinkdbs()) {
160 Db sinkdb = dbRepository.findByName(item);
161 if (sinkdb != null) {
162 relateDb.add(sinkdb);
165 if (!relateDb.isEmpty())
166 topic.setDbs(relateDb);
168 topic.getDbs().clear();
171 topic.setDbs(relateDb);
174 Set<Kafka> relateKafka = new HashSet<>();
175 if (tConfig.getKafkas() != null) {
176 for (int item : tConfig.getKafkas()) {
177 Optional<Kafka> sinkKafka = kafkaRepository.findById(item);
178 if (sinkKafka.isPresent()) {
179 relateKafka.add(sinkKafka.get());
182 if (!relateKafka.isEmpty()) {
183 topic.setKafkas(relateKafka);
185 topic.getKafkas().clear();
188 topic.setKafkas(relateKafka);