supports multiple Kafka clusters and DBs
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / service / StoreService.java
index 1cd3a8a..f5a7698 100644 (file)
@@ -22,35 +22,45 @@ package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collection;
 import java.util.List;
-import java.util.Map; 
+import java.util.Set;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
-
-import org.json.JSONException;
 import org.json.JSONObject;
 import org.json.XML;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+import org.onap.datalake.feeder.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; 
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 
 /**
  * Service to store messages to varieties of DBs
  * 
- * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML?
+ * comment out YAML support, since AML is for config and don't see this data
+ * type in DMaaP. Do we need to support XML?
  * 
  * @author Guobiao Mo
  *
@@ -60,65 +70,76 @@ public class StoreService {
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
        @Autowired
-       private TopicService topicService;
+       private ApplicationConfiguration config;
 
        @Autowired
-       private CouchbaseService couchbaseService;
+       private ApplicationContext context;
 
        @Autowired
-       private ElasticsearchService elasticsearchService;
+       private TopicConfigPollingService configPollingService;
 
-       private Map<String, Topic> topicMap = new HashMap<>(); 
-
-       private ObjectMapper yamlReader;        
+       private ObjectMapper yamlReader;
 
        @PostConstruct
        private void init() {
                yamlReader = new ObjectMapper(new YAMLFactory());
        }
 
-       @PreDestroy
-       public void cleanUp() {
-       }
-
-       public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
-               if (messages == null || messages.isEmpty()) {
+       public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
+               if (CollectionUtils.isEmpty(messages)) {
                        return;
                }
 
-               Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
-                       return topicService.getEffectiveTopic(topicStr);
-               });
+               Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
+               for(EffectiveTopic effectiveTopic:effectiveTopics) {
+                       saveMessagesForTopic(effectiveTopic, messages);
+               }
+       }
+       
+       private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
+               if (!effectiveTopic.getTopic().isEnabled()) {
+                       log.error("we should not come here {}", effectiveTopic);
+                       return;
+               }
 
                List<JSONObject> docs = new ArrayList<>();
 
                for (Pair<Long, String> pair : messages) {
                        try {
-                               docs.add(messageToJson(topic, pair));
+                               docs.add(messageToJson(effectiveTopic, pair));
                        } catch (Exception e) {
-                               log.error(pair.getRight(), e);
+                               //may see org.json.JSONException.
+                               log.error("Error when converting this message to JSON: " + pair.getRight(), e);
                        }
                }
 
-               saveJsons(topic, docs);
+               Set<Db> dbs = effectiveTopic.getTopic().getDbs();
+
+               for (Db db : dbs) {
+                       if (db.getDbType().isTool() || !db.isEnabled()) {
+                               continue;
+                       }
+                       DbStoreService dbStoreService = findDbStoreService(db);
+                       dbStoreService.saveJsons(effectiveTopic, docs);
+               }
        }
 
-       private JSONObject messageToJson(Topic topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+       private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException {
 
                long timestamp = pair.getLeft();
                String text = pair.getRight();
 
                //for debug, to be remove
-//             String topicStr = topic.getId();
-//             if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
-       //              log.debug("{} ={}", topicStr, text);
+               //              String topicStr = topic.getId();
+               //              if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
+               //              log.debug("{} ={}", topicStr, text);
                //}
 
-               boolean storeRaw = topic.isSaveRaw();
+               boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
 
-               JSONObject json = null;         
+               JSONObject json = null;
 
-               DataFormat dataFormat = topic.getDataFormat();
+               DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2();
 
                switch (dataFormat) {
                case JSON:
@@ -126,7 +147,7 @@ public class StoreService {
                        break;
                case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON 
                        json = XML.toJSONObject(text);
-                       break;          
+                       break;
                case YAML:// Do we need to support YAML?
                        Object obj = yamlReader.readValue(text, Object.class);
                        ObjectMapper jsonWriter = new ObjectMapper();
@@ -142,23 +163,53 @@ public class StoreService {
                //FIXME for debug, to be remove
                json.remove("_id");
                json.remove("_dl_text_");
+               json.remove("_dl_type_");
 
-               json.put("_ts", timestamp);
+               json.put(config.getTimestampLabel(), timestamp);
                if (storeRaw) {
-                       json.put("_text", text);
+                       json.put(config.getRawDataLabel(), text);
+               }
+
+               if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) {
+                       String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2();
+                       for (String path : paths) {
+                               JsonUtil.arrayAggregate(path, json);
+                       }
+               }
+
+               if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) {
+                       String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2();
+                       for (String path : paths) {
+                               JsonUtil.flattenArray(path, json);
+                       }
                }
 
                return json;
        }
 
-       private void saveJsons(Topic topic, List<JSONObject> jsons) {
-               if (topic.isSupportCouchbase()) {
-                       couchbaseService.saveJsons(topic, jsons);
+       private DbStoreService findDbStoreService(Db db) {
+               DbType dbType = db.getDbType();
+               DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+               switch (dbTypeEnum) {
+               case CB:
+                       return context.getBean(CouchbaseService.class, db);
+               case ES:
+                       return context.getBean(ElasticsearchService.class, db);
+               case HDFS:
+                       return context.getBean(HdfsService.class, db);
+               case MONGO:
+                       return context.getBean(MongodbService.class, db);
+               default:
+                       log.error("we should not come here {}", dbTypeEnum);
+                       return null;
                }
+       }
 
-               if (topic.isSupportElasticsearch()) {
-                       elasticsearchService.saveJsons(topic, jsons);
-               }
+       public void flush() { //force flush all buffer 
+//             hdfsService.flush();
        }
 
+       public void flushStall() { //flush stall buffer
+       //      hdfsService.flushStall();
+       }
 }