import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collection;
import java.util.List;
-import java.util.Map;
+import java.util.Set;
import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
-
-import org.json.JSONException;
import org.json.JSONObject;
import org.json.XML;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
+
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
/**
* Service to store messages to varieties of DBs
*
- * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML?
+ * comment out YAML support, since AML is for config and don't see this data
+ * type in DMaaP. Do we need to support XML?
*
* @author Guobiao Mo
*
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private TopicService topicService;
+ private ApplicationConfiguration config;
@Autowired
- private CouchbaseService couchbaseService;
+ private ApplicationContext context;
@Autowired
- private ElasticsearchService elasticsearchService;
+ private TopicConfigPollingService configPollingService;
- private Map<String, Topic> topicMap = new HashMap<>();
-
- private ObjectMapper yamlReader;
+ private ObjectMapper yamlReader;
@PostConstruct
private void init() {
yamlReader = new ObjectMapper(new YAMLFactory());
}
- @PreDestroy
- public void cleanUp() {
- }
-
- public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
- if (messages == null || messages.isEmpty()) {
+ public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
+ if (CollectionUtils.isEmpty(messages)) {
return;
}
- Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
- return topicService.getEffectiveTopic(topicStr);
- });
+ Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
+ for(EffectiveTopic effectiveTopic:effectiveTopics) {
+ saveMessagesForTopic(effectiveTopic, messages);
+ }
+ }
+
+ private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
+ if (!effectiveTopic.getTopic().isEnabled()) {
+ log.error("we should not come here {}", effectiveTopic);
+ return;
+ }
List<JSONObject> docs = new ArrayList<>();
for (Pair<Long, String> pair : messages) {
try {
- docs.add(messageToJson(topic, pair));
+ docs.add(messageToJson(effectiveTopic, pair));
} catch (Exception e) {
- log.error(pair.getRight(), e);
+ //may see org.json.JSONException.
+ log.error("Error when converting this message to JSON: " + pair.getRight(), e);
}
}
- saveJsons(topic, docs);
+ Set<Db> dbs = effectiveTopic.getTopic().getDbs();
+
+ for (Db db : dbs) {
+ if (db.getDbType().isTool() || !db.isEnabled()) {
+ continue;
+ }
+ DbStoreService dbStoreService = findDbStoreService(db);
+ dbStoreService.saveJsons(effectiveTopic, docs);
+ }
}
- private JSONObject messageToJson(Topic topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+ private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
//for debug, to be remove
-// String topicStr = topic.getId();
-// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
- // log.debug("{} ={}", topicStr, text);
+ // String topicStr = topic.getId();
+ // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
+ // log.debug("{} ={}", topicStr, text);
//}
- boolean storeRaw = topic.isSaveRaw();
+ boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
- JSONObject json = null;
+ JSONObject json = null;
- DataFormat dataFormat = topic.getDataFormat();
+ DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2();
switch (dataFormat) {
case JSON:
break;
case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON
json = XML.toJSONObject(text);
- break;
+ break;
case YAML:// Do we need to support YAML?
Object obj = yamlReader.readValue(text, Object.class);
ObjectMapper jsonWriter = new ObjectMapper();
//FIXME for debug, to be remove
json.remove("_id");
json.remove("_dl_text_");
+ json.remove("_dl_type_");
- json.put("_ts", timestamp);
+ json.put(config.getTimestampLabel(), timestamp);
if (storeRaw) {
- json.put("_text", text);
+ json.put(config.getRawDataLabel(), text);
+ }
+
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2();
+ for (String path : paths) {
+ JsonUtil.arrayAggregate(path, json);
+ }
+ }
+
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2();
+ for (String path : paths) {
+ JsonUtil.flattenArray(path, json);
+ }
}
return json;
}
- private void saveJsons(Topic topic, List<JSONObject> jsons) {
- if (topic.isSupportCouchbase()) {
- couchbaseService.saveJsons(topic, jsons);
+ private DbStoreService findDbStoreService(Db db) {
+ DbType dbType = db.getDbType();
+ DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+ switch (dbTypeEnum) {
+ case CB:
+ return context.getBean(CouchbaseService.class, db);
+ case ES:
+ return context.getBean(ElasticsearchService.class, db);
+ case HDFS:
+ return context.getBean(HdfsService.class, db);
+ case MONGO:
+ return context.getBean(MongodbService.class, db);
+ default:
+ log.error("we should not come here {}", dbTypeEnum);
+ return null;
}
+ }
- if (topic.isSupportElasticsearch()) {
- elasticsearchService.saveJsons(topic, jsons);
- }
+ public void flush() { //force flush all buffer
+// hdfsService.flush();
}
+ public void flushStall() { //flush stall buffer
+ // hdfsService.flushStall();
+ }
}