private String defaultTopicName;
- private int checkTopicInterval; //in millisecond
-/*
- //DMaaP
- private String dmaapZookeeperHostPort;
- private String dmaapKafkaHostPort;
- private String dmaapKafkaGroup;
- private String dmaapKafkaLogin;
- private String dmaapKafkaPass;
- private String dmaapKafkaSecurityProtocol;
-
- private long dmaapKafkaTimeout;
- private String[] dmaapKafkaExclude;
+ private long checkTopicInterval; //in millisecond
- private int dmaapCheckNewTopicInterval; //in millisecond
-
- private int kafkaConsumerCount;
-*/
private String elasticsearchType;
//HDFS
)
private Set<Topic> topics;
+ public boolean isTool() {
+ return dbType.isTool();
+ }
+
public boolean isHdfs() {
return isDb(DbTypeEnum.HDFS);
}
@Override
public String toString() {
- return String.format("Db %s (name=%, enabled=%s)", id, name, enabled);
+ return String.format("Db %s (name=%s, enabled=%s)", id, name, enabled);
}
@Override
@Override
public String toString() {
- return String.format("Kafka %s (name=%, enabled=%s)", id, name, enabled);
+ return String.format("Kafka %s (name=%s, enabled=%s)", id, name, enabled);
}
@Override
package org.onap.datalake.feeder.service;
-import org.onap.datalake.feeder.repository.DbRepository;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
*/
@Service
public class DbService {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private DbRepository dbRepository;
+ private ApplicationContext context;
+
+ private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
+
+ public DbStoreService findDbStoreService(Db db) {
+ DbStoreService ret = dbStoreServiceMap.get(db.getId());
+ if (ret != null) {
+ return ret;
+ }
+
+ DbType dbType = db.getDbType();
+ DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+ switch (dbTypeEnum) {
+ case CB:
+ ret = context.getBean(CouchbaseService.class, db);
+ break;
+ case ES:
+ ret = context.getBean(ElasticsearchService.class, db);
+ break;
+ case HDFS:
+ ret = context.getBean(HdfsService.class, db);
+ break;
+ case MONGO:
+ ret = context.getBean(MongodbService.class, db);
+ break;
+ default:
+ log.error("Should not have come here {}", db);
+ ret = null;
+ }
+
+ dbStoreServiceMap.put(db.getId(), ret);
+
+ return ret;
+ }
+
}
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
*
*/
@Service
+@Scope("prototype")
public class DmaapService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
log.debug("get topic setting from DB: {}.", topicStr);
List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true);
-
- ret.put(topicStr , effectiveTopics);
+ if(CollectionUtils.isNotEmpty(effectiveTopics )) {
+ log.debug("add effectiveTopics {}:{}.", topicStr, effectiveTopics);
+ ret.put(topicStr , effectiveTopics);
+ }
}
return ret;
private boolean isRunning = false;
private ExecutorService executorService;
-// private Thread topicConfigPollingThread;
private Set<Puller> pullers;
@Autowired
}
executorService.submit(topicConfigPollingService);
- /*topicConfigPollingThread = new Thread(topicConfigPollingService);
- topicConfigPollingThread.setName("TopicConfigPolling");
- topicConfigPollingThread.start();
-*/
+
isRunning = true;
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
puller.shutdown();
}
-// logger.info("stop TopicConfigPollingService ...");
-// topicConfigPollingService.shutdown();
-
- // topicConfigPollingThread.join();
-
logger.info("stop executorService ...");
executorService.shutdown();
executorService.awaitTermination(120L, TimeUnit.SECONDS);
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
*/
@Service
+@Scope("prototype")
public class Puller implements Runnable {
@Autowired
private boolean async;
private Kafka kafka;
-
- public Puller( ) {
-
- }
+
public Puller(Kafka kafka) {
this.kafka = kafka;
}
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.DbType;
import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
private ApplicationConfiguration config;
@Autowired
- private ApplicationContext context;
+ private DbService dbService;
@Autowired
private TopicConfigPollingService configPollingService;
}
Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
- for(EffectiveTopic effectiveTopic:effectiveTopics) {
+ for (EffectiveTopic effectiveTopic : effectiveTopics) {
saveMessagesForTopic(effectiveTopic, messages);
}
}
-
+
private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
if (!effectiveTopic.getTopic().isEnabled()) {
log.error("we should not come here {}", effectiveTopic);
Set<Db> dbs = effectiveTopic.getTopic().getDbs();
for (Db db : dbs) {
- if (db.getDbType().isTool() || !db.isEnabled()) {
+ if (db.isTool() || db.isDruid() || !db.isEnabled()) {
continue;
}
- DbStoreService dbStoreService = findDbStoreService(db);
- dbStoreService.saveJsons(effectiveTopic, docs);
+ DbStoreService dbStoreService = dbService.findDbStoreService(db);
+ if (dbStoreService != null) {
+ dbStoreService.saveJsons(effectiveTopic, docs);
+ }
}
}
long timestamp = pair.getLeft();
String text = pair.getRight();
- //for debug, to be remove
- // String topicStr = topic.getId();
- // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
- // log.debug("{} ={}", topicStr, text);
- //}
-
boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
JSONObject json = null;
return json;
}
- private DbStoreService findDbStoreService(Db db) {
- DbType dbType = db.getDbType();
- DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
- switch (dbTypeEnum) {
- case CB:
- return context.getBean(CouchbaseService.class, db);
- case ES:
- return context.getBean(ElasticsearchService.class, db);
- case HDFS:
- return context.getBean(HdfsService.class, db);
- case MONGO:
- return context.getBean(MongodbService.class, db);
- default:
- log.error("we should not come here {}", dbTypeEnum);
- return null;
- }
- }
-
public void flush() { //force flush all buffer
-// hdfsService.flush();
+ // hdfsService.flush();
}
public void flushStall() { //flush stall buffer
- // hdfsService.flushStall();
+ // hdfsService.flushStall();
}
}
private KafkaRepository kafkaRepository;
//effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
- private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();;
+ private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
//private Map<String, TopicConfig> effectiveTopicConfigMap;
//monitor Kafka topic list changes
private Map<String, Set<String>> activeTopicMap;
- private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = new ThreadLocal<>();
- private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();
+ private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//topic name:version
+ private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
+ private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
private boolean active = false;
private Set<String> poll(Kafka kafka) throws IOException {
log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
- DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
+ DmaapService dmaapService = dmaapServiceMap.get(kafka.getId());
+ if(dmaapService==null) {
+ dmaapService = context.getBean(DmaapService.class, kafka);
+ dmaapServiceMap.put(kafka.getId(), dmaapService);
+ }
Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@Autowired
private ApplicationConfiguration config;
- @Autowired
- private ApplicationContext context;
-
@Autowired
private TopicNameRepository topicNameRepository;
@Autowired
private DbRepository dbRepository;
+ @Autowired
+ private DbService dbService;
+
public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
List<Topic> topics = findTopics(kafka, topicStr);
if (ensureTableExist) {
for (Db db : topic.getDbs()) {
if (db.isElasticsearch()) {
- ElasticsearchService elasticsearchService = context.getBean(ElasticsearchService.class, db);
+ ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db);
elasticsearchService.ensureTableExist(topicStr);
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import com.couchbase.client.java.Bucket;
*
*/
@Service
+@Scope("prototype")
public class CouchbaseService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
ApplicationConfiguration config;
private Db couchbase;
-/*
- @Autowired
- private DbService dbService;
-
- private boolean isReady = false;
-*/
Bucket bucket;
-
- public CouchbaseService( ) {
-
- }
+
public CouchbaseService(Db db) {
couchbase = db;
}
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
*
*/
@Service
+@Scope("prototype")
public class ElasticsearchService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private ApplicationConfiguration config;
- //@Autowired
-// private DbService dbService;
-
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
-
- public ElasticsearchService( ) {
-
- }
+
public ElasticsearchService(Db db) {
elasticsearch = db;
}
//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
private void init() {
- //Db elasticsearch = dbService.getElasticsearch();
String elasticsearchHost = elasticsearch.getHost();
// Initialize the Connection
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import lombok.Getter;
*
*/
@Service
+@Scope("prototype")
public class HdfsService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
}
}
- public HdfsService( ) {
- }
-
public HdfsService(Db db) {
hdfs = db;
}
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import com.mongodb.bulk.BulkWriteError;
*
*/
@Service
+@Scope("prototype")
public class MongodbService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
private InsertManyOptions insertManyOptions;
- public MongodbService( ) {
- }
public MongodbService(Db db) {
mongodb = db;
}