supports multiple Kafka clusters and DBs
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / service / StoreService.java
index 2a2f997..f5a7698 100644 (file)
@@ -22,7 +22,9 @@ package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 import javax.annotation.PostConstruct;
 
@@ -32,12 +34,23 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONObject;
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
 import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
 import org.onap.datalake.feeder.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -60,19 +73,10 @@ public class StoreService {
        private ApplicationConfiguration config;
 
        @Autowired
-       private TopicConfigPollingService configPollingService;
-
-       @Autowired
-       private MongodbService mongodbService;
+       private ApplicationContext context;
 
        @Autowired
-       private CouchbaseService couchbaseService;
-
-       @Autowired
-       private ElasticsearchService elasticsearchService;
-
-       @Autowired
-       private HdfsService hdfsService;
+       private TopicConfigPollingService configPollingService;
 
        private ObjectMapper yamlReader;
 
@@ -81,28 +85,46 @@ public class StoreService {
                yamlReader = new ObjectMapper(new YAMLFactory());
        }
 
-       public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
+       public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
                if (CollectionUtils.isEmpty(messages)) {
                        return;
                }
 
-               TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
+               Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
+               for(EffectiveTopic effectiveTopic:effectiveTopics) {
+                       saveMessagesForTopic(effectiveTopic, messages);
+               }
+       }
+       
+       private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
+               if (!effectiveTopic.getTopic().isEnabled()) {
+                       log.error("we should not come here {}", effectiveTopic);
+                       return;
+               }
 
                List<JSONObject> docs = new ArrayList<>();
 
                for (Pair<Long, String> pair : messages) {
                        try {
-                               docs.add(messageToJson(topicConfig, pair));
+                               docs.add(messageToJson(effectiveTopic, pair));
                        } catch (Exception e) {
                                //may see org.json.JSONException.
                                log.error("Error when converting this message to JSON: " + pair.getRight(), e);
                        }
                }
 
-               saveJsons(topicConfig, docs, messages);
+               Set<Db> dbs = effectiveTopic.getTopic().getDbs();
+
+               for (Db db : dbs) {
+                       if (db.getDbType().isTool() || !db.isEnabled()) {
+                               continue;
+                       }
+                       DbStoreService dbStoreService = findDbStoreService(db);
+                       dbStoreService.saveJsons(effectiveTopic, docs);
+               }
        }
 
-       private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
+       private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException {
 
                long timestamp = pair.getLeft();
                String text = pair.getRight();
@@ -113,11 +135,11 @@ public class StoreService {
                //              log.debug("{} ={}", topicStr, text);
                //}
 
-               boolean storeRaw = topicConfig.isSaveRaw();
+               boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
 
                JSONObject json = null;
 
-               DataFormat dataFormat = topicConfig.getDataFormat2();
+               DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2();
 
                switch (dataFormat) {
                case JSON:
@@ -148,15 +170,15 @@ public class StoreService {
                        json.put(config.getRawDataLabel(), text);
                }
 
-               if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
-                       String[] paths = topicConfig.getAggregateArrayPath2();
+               if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) {
+                       String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2();
                        for (String path : paths) {
                                JsonUtil.arrayAggregate(path, json);
                        }
                }
 
-               if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
-                       String[] paths = topicConfig.getFlattenArrayPath2();
+               if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) {
+                       String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2();
                        for (String path : paths) {
                                JsonUtil.flattenArray(path, json);
                        }
@@ -165,29 +187,29 @@ public class StoreService {
                return json;
        }
 
-       private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
-               if (topic.supportMongoDB()) {
-                       mongodbService.saveJsons(topic, jsons);
-               }
-
-               if (topic.supportCouchbase()) {
-                       couchbaseService.saveJsons(topic, jsons);
-               }
-
-               if (topic.supportElasticsearch()) {
-                       elasticsearchService.saveJsons(topic, jsons);
-               }
-
-               if (topic.supportHdfs()) {
-                       hdfsService.saveMessages(topic, messages);
+       private DbStoreService findDbStoreService(Db db) {
+               DbType dbType = db.getDbType();
+               DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+               switch (dbTypeEnum) {
+               case CB:
+                       return context.getBean(CouchbaseService.class, db);
+               case ES:
+                       return context.getBean(ElasticsearchService.class, db);
+               case HDFS:
+                       return context.getBean(HdfsService.class, db);
+               case MONGO:
+                       return context.getBean(MongodbService.class, db);
+               default:
+                       log.error("we should not come here {}", dbTypeEnum);
+                       return null;
                }
        }
 
        public void flush() { //force flush all buffer 
-               hdfsService.flush();
+//             hdfsService.flush();
        }
 
        public void flushStall() { //flush stall buffer
-               hdfsService.flushStall();
+       //      hdfsService.flushStall();
        }
 }