import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import javax.annotation.PostConstruct;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
private ApplicationConfiguration config;
@Autowired
- private TopicConfigPollingService configPollingService;
-
- @Autowired
- private MongodbService mongodbService;
+ private ApplicationContext context;
@Autowired
- private CouchbaseService couchbaseService;
-
- @Autowired
- private ElasticsearchService elasticsearchService;
-
- @Autowired
- private HdfsService hdfsService;
+ private TopicConfigPollingService configPollingService;
private ObjectMapper yamlReader;
yamlReader = new ObjectMapper(new YAMLFactory());
}
- public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
+ public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
if (CollectionUtils.isEmpty(messages)) {
return;
}
- TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
+ Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
+ for(EffectiveTopic effectiveTopic:effectiveTopics) {
+ saveMessagesForTopic(effectiveTopic, messages);
+ }
+ }
+
+ private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
+ if (!effectiveTopic.getTopic().isEnabled()) {
+ log.error("we should not come here {}", effectiveTopic);
+ return;
+ }
List<JSONObject> docs = new ArrayList<>();
for (Pair<Long, String> pair : messages) {
try {
- docs.add(messageToJson(topicConfig, pair));
+ docs.add(messageToJson(effectiveTopic, pair));
} catch (Exception e) {
//may see org.json.JSONException.
log.error("Error when converting this message to JSON: " + pair.getRight(), e);
}
}
- saveJsons(topicConfig, docs, messages);
+ Set<Db> dbs = effectiveTopic.getTopic().getDbs();
+
+ for (Db db : dbs) {
+ if (db.getDbType().isTool() || !db.isEnabled()) {
+ continue;
+ }
+ DbStoreService dbStoreService = findDbStoreService(db);
+ dbStoreService.saveJsons(effectiveTopic, docs);
+ }
}
- private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
+ private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
// log.debug("{} ={}", topicStr, text);
//}
- boolean storeRaw = topicConfig.isSaveRaw();
+ boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
JSONObject json = null;
- DataFormat dataFormat = topicConfig.getDataFormat2();
+ DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2();
switch (dataFormat) {
case JSON:
json.put(config.getRawDataLabel(), text);
}
- if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
- String[] paths = topicConfig.getAggregateArrayPath2();
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2();
for (String path : paths) {
JsonUtil.arrayAggregate(path, json);
}
}
- if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
- String[] paths = topicConfig.getFlattenArrayPath2();
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2();
for (String path : paths) {
JsonUtil.flattenArray(path, json);
}
return json;
}
- private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
- if (topic.supportMongoDB()) {
- mongodbService.saveJsons(topic, jsons);
- }
-
- if (topic.supportCouchbase()) {
- couchbaseService.saveJsons(topic, jsons);
- }
-
- if (topic.supportElasticsearch()) {
- elasticsearchService.saveJsons(topic, jsons);
- }
-
- if (topic.supportHdfs()) {
- hdfsService.saveMessages(topic, messages);
+ private DbStoreService findDbStoreService(Db db) {
+ DbType dbType = db.getDbType();
+ DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+ switch (dbTypeEnum) {
+ case CB:
+ return context.getBean(CouchbaseService.class, db);
+ case ES:
+ return context.getBean(ElasticsearchService.class, db);
+ case HDFS:
+ return context.getBean(HdfsService.class, db);
+ case MONGO:
+ return context.getBean(MongodbService.class, db);
+ default:
+ log.error("we should not come here {}", dbTypeEnum);
+ return null;
}
}
public void flush() { //force flush all buffer
- hdfsService.flush();
+// hdfsService.flush();
}
public void flushStall() { //flush stall buffer
- hdfsService.flushStall();
+ // hdfsService.flushStall();
}
}