043cc653304bee21c731d71f9bae2742e5141f78
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / service / TopicService.java
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.Set;
29
30 import org.apache.commons.collections.CollectionUtils;
31 import org.onap.datalake.feeder.config.ApplicationConfiguration;
32 import org.onap.datalake.feeder.dto.TopicConfig;
33 import org.onap.datalake.feeder.domain.Db;
34 import org.onap.datalake.feeder.domain.EffectiveTopic;
35 import org.onap.datalake.feeder.domain.Kafka;
36 import org.onap.datalake.feeder.domain.Topic;
37 import org.onap.datalake.feeder.repository.DbRepository;
38 import org.onap.datalake.feeder.repository.KafkaRepository;
39 import org.onap.datalake.feeder.repository.TopicNameRepository;
40 import org.onap.datalake.feeder.repository.TopicRepository;
41 import org.onap.datalake.feeder.service.db.ElasticsearchService;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.stereotype.Service;
46
47 /**
48  * Service for topics
49  * 
50  * @author Guobiao Mo
51  *
52  */
53 @Service
54 public class TopicService {
55
56         private final Logger log = LoggerFactory.getLogger(this.getClass());
57
58         @Autowired
59         private ApplicationConfiguration config;
60
61         @Autowired
62         private TopicNameRepository topicNameRepository;
63
64         @Autowired
65         private TopicRepository topicRepository;
66
67         @Autowired
68         private DbRepository dbRepository;
69
70         @Autowired
71         private DbService dbService;
72
73         @Autowired
74         private KafkaRepository kafkaRepository;
75         
76         public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
77
78                 List<Topic> topics = findTopics(kafka, topicStr);
79                 if (CollectionUtils.isEmpty(topics)) {
80                         topics = new ArrayList<>();
81                         topics.add(getDefaultTopic(kafka));
82                 }
83
84                 List<EffectiveTopic> ret = new ArrayList<>();
85                 for (Topic topic : topics) {
86                         if (!topic.isEnabled()) {
87                                 continue;
88                         }
89                         ret.add(new EffectiveTopic(topic, topicStr));
90
91                         if (ensureTableExist) {
92                                 for (Db db : topic.getDbs()) {
93                                         if (db.isElasticsearch()) {
94                                                 ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db);                                            
95                                                 elasticsearchService.ensureTableExist(topicStr);
96                                         }
97                                 }
98                         }
99                 }
100
101                 return ret;
102         }
103
104         //TODO use query
105         public List<Topic> findTopics(Kafka kafka, String topicStr) {
106                 List<Topic> ret = new ArrayList<>();
107                 
108                 Iterable<Topic> allTopics = topicRepository.findAll();
109                 for(Topic topic: allTopics) {
110                         if(topic.getKafkas().contains(kafka ) && topic.getTopicName().getId().equals(topicStr)){
111                                 ret.add(topic);
112                         }
113                 }
114                 return ret;
115         }
116
117         public Topic getTopic(int topicId) {
118                 Optional<Topic> ret = topicRepository.findById(topicId);
119                 return ret.isPresent() ? ret.get() : null;
120         }
121
122         public Topic getDefaultTopic(Kafka kafka) {
123                 return findTopics(kafka, config.getDefaultTopicName()).get(0);
124         }
125
126         public boolean isDefaultTopic(Topic topic) {
127                 if (topic == null) {
128                         return false;
129                 }
130                 return topic.getName().equals(config.getDefaultTopicName());
131         }
132
133         public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) {
134                 fillTopic(tConfig, wTopic);
135         }
136
137         public Topic fillTopicConfiguration(TopicConfig tConfig) {
138                 Topic topic = new Topic();
139                 fillTopic(tConfig, topic);
140                 return topic;
141         }
142
143         private void fillTopic(TopicConfig tConfig, Topic topic) {
144                 Set<Db> relateDb = new HashSet<>();
145                 topic.setId(tConfig.getId());
146                 topic.setTopicName(topicNameRepository.findById(tConfig.getName()).get());
147                 topic.setLogin(tConfig.getLogin());
148                 topic.setPass(tConfig.getPassword());
149                 topic.setEnabled(tConfig.isEnabled());
150                 topic.setSaveRaw(tConfig.isSaveRaw());
151                 topic.setTtl(tConfig.getTtl());
152                 topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
153                 topic.setDataFormat(tConfig.getDataFormat());
154                 topic.setMessageIdPath(tConfig.getMessageIdPath());
155                 topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
156                 topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
157
158                 if (tConfig.getSinkdbs() != null) {
159                         for (String item : tConfig.getSinkdbs()) {
160                                 Db sinkdb = dbRepository.findByName(item);
161                                 if (sinkdb != null) {
162                                         relateDb.add(sinkdb);
163                                 }
164                         }
165                         if (!relateDb.isEmpty())
166                                 topic.setDbs(relateDb);
167                         else {
168                                 topic.getDbs().clear();
169                         }
170                 } else {
171                         topic.setDbs(relateDb);
172                 }
173
174                 Set<Kafka> relateKafka = new HashSet<>();
175                 if (tConfig.getKafkas() != null) {
176                         for (int item : tConfig.getKafkas()) {
177                                 Optional<Kafka> sinkKafka = kafkaRepository.findById(item);
178                                 if (sinkKafka.isPresent()) {
179                                         relateKafka.add(sinkKafka.get());
180                                 }
181                         }
182                         if (!relateKafka.isEmpty()) {
183                                 topic.setKafkas(relateKafka);
184                         } else {
185                                 topic.getKafkas().clear();
186                         }
187                 } else {
188                         topic.setKafkas(relateKafka);
189                 }
190         }
191
192 }