supports multiple Kafka clusters and DBs 92/90692/1
authorGuobiao Mo <guobiaomo@chinamobile.com>
Fri, 28 Jun 2019 21:51:10 +0000 (14:51 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Fri, 28 Jun 2019 21:51:10 +0000 (14:51 -0700)
Read data from Kafka and store into DBs

Issue-ID: DCAEGEN2-1631

Change-Id: Ib8fccfd84cfdcd2e284ba4f2503b0fbfe41eb5ae
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
14 files changed:
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java

index 806dc72..05d6e88 100644 (file)
@@ -54,23 +54,8 @@ public class ApplicationConfiguration {
 
        private String defaultTopicName;
 
-       private int checkTopicInterval; //in millisecond
-/*
-       //DMaaP
-       private String dmaapZookeeperHostPort;
-       private String dmaapKafkaHostPort;
-       private String dmaapKafkaGroup;
-       private String dmaapKafkaLogin;
-       private String dmaapKafkaPass;
-       private String dmaapKafkaSecurityProtocol;
-       
-       private long dmaapKafkaTimeout;
-       private String[] dmaapKafkaExclude;
+       private long checkTopicInterval; //in millisecond
 
-       private int dmaapCheckNewTopicInterval; //in millisecond
-
-       private int kafkaConsumerCount;
-*/
        private String elasticsearchType;
 
        //HDFS
index 7059cd0..2a653d8 100644 (file)
@@ -101,6 +101,10 @@ public class Db {
        )
        private Set<Topic> topics;
 
+       public boolean isTool() {
+               return dbType.isTool();
+       }
+       
        public boolean isHdfs() {
                return isDb(DbTypeEnum.HDFS);
        }
@@ -127,7 +131,7 @@ public class Db {
        
        @Override
        public String toString() {
-               return String.format("Db %s (name=%, enabled=%s)", id, name, enabled);
+               return String.format("Db %s (name=%s, enabled=%s)", id, name, enabled);
        }
 
        @Override
index d2189cb..26be942 100644 (file)
@@ -105,7 +105,7 @@ public class Kafka {
 
        @Override
        public String toString() {
-               return String.format("Kafka %s (name=%, enabled=%s)", id, name, enabled);
+               return String.format("Kafka %s (name=%s, enabled=%s)", id, name, enabled);
        }
 
        @Override
index 2e934e2..addd060 100644 (file)
 
 package org.onap.datalake.feeder.service;
 
-import org.onap.datalake.feeder.repository.DbRepository;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 /**
@@ -32,7 +45,42 @@ import org.springframework.stereotype.Service;
  */
 @Service
 public class DbService {
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
 
        @Autowired
-       private DbRepository dbRepository;
+       private ApplicationContext context;
+
+       private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
+
+       public DbStoreService findDbStoreService(Db db) {
+               DbStoreService ret = dbStoreServiceMap.get(db.getId());
+               if (ret != null) {
+                       return ret;
+               }
+
+               DbType dbType = db.getDbType();
+               DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+               switch (dbTypeEnum) {
+               case CB:
+                       ret = context.getBean(CouchbaseService.class, db);
+                       break;
+               case ES:
+                       ret = context.getBean(ElasticsearchService.class, db);
+                       break;
+               case HDFS:
+                       ret = context.getBean(HdfsService.class, db);
+                       break;
+               case MONGO:
+                       ret = context.getBean(MongodbService.class, db);
+                       break;
+               default:
+                       log.error("Should not have come here {}", db);
+                       ret = null;
+               }
+
+               dbStoreServiceMap.put(db.getId(), ret);
+
+               return ret;
+       }
+
 }
index 1bfd437..671234b 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -39,10 +39,10 @@ import org.apache.zookeeper.ZooKeeper;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.EffectiveTopic;
 import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
@@ -52,6 +52,7 @@ import org.springframework.stereotype.Service;
  *
  */
 @Service
+@Scope("prototype")
 public class DmaapService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -145,8 +146,10 @@ public class DmaapService {
                        log.debug("get topic setting from DB: {}.", topicStr);
 
                        List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true);
-                       
-                       ret.put(topicStr , effectiveTopics);
+                       if(CollectionUtils.isNotEmpty(effectiveTopics )) {
+                               log.debug("add effectiveTopics  {}:{}.", topicStr, effectiveTopics);
+                               ret.put(topicStr , effectiveTopics);
+                       }
                        
                }
                return ret;
index 65de0bd..09a59ee 100644 (file)
@@ -50,7 +50,6 @@ public class PullService {
 
        private boolean isRunning = false;
        private ExecutorService executorService;
-//     private Thread topicConfigPollingThread;
        private Set<Puller> pullers;
 
        @Autowired
@@ -95,10 +94,7 @@ public class PullService {
                }
 
                executorService.submit(topicConfigPollingService);
-               /*topicConfigPollingThread = new Thread(topicConfigPollingService);
-               topicConfigPollingThread.setName("TopicConfigPolling");
-               topicConfigPollingThread.start();
-*/
+               
                isRunning = true;
 
                Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
@@ -127,11 +123,6 @@ public class PullService {
                                puller.shutdown();
                        }
 
-//                     logger.info("stop TopicConfigPollingService ...");
-//                     topicConfigPollingService.shutdown();
-
-       //              topicConfigPollingThread.join();
-
                        logger.info("stop executorService ...");
                        executorService.shutdown();
                        executorService.awaitTermination(120L, TimeUnit.SECONDS);
index 1550e53..151ea3d 100644 (file)
@@ -43,6 +43,7 @@ import org.onap.datalake.feeder.domain.Kafka;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
@@ -53,6 +54,7 @@ import org.springframework.stereotype.Service;
  */
 
 @Service
+@Scope("prototype")
 public class Puller implements Runnable {
 
        @Autowired
@@ -72,10 +74,7 @@ public class Puller implements Runnable {
        private boolean async;
        
        private Kafka kafka;
-
-       public Puller( ) {
-               
-       }
+       
        public Puller(Kafka kafka) {
                this.kafka = kafka;
        }
index f5a7698..0e54b9b 100644 (file)
@@ -35,22 +35,14 @@ import org.json.JSONObject;
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.DbType;
 import org.onap.datalake.feeder.domain.EffectiveTopic;
 import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
 import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
 import org.onap.datalake.feeder.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -73,7 +65,7 @@ public class StoreService {
        private ApplicationConfiguration config;
 
        @Autowired
-       private ApplicationContext context;
+       private DbService dbService;
 
        @Autowired
        private TopicConfigPollingService configPollingService;
@@ -91,11 +83,11 @@ public class StoreService {
                }
 
                Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
-               for(EffectiveTopic effectiveTopic:effectiveTopics) {
+               for (EffectiveTopic effectiveTopic : effectiveTopics) {
                        saveMessagesForTopic(effectiveTopic, messages);
                }
        }
-       
+
        private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
                if (!effectiveTopic.getTopic().isEnabled()) {
                        log.error("we should not come here {}", effectiveTopic);
@@ -116,11 +108,13 @@ public class StoreService {
                Set<Db> dbs = effectiveTopic.getTopic().getDbs();
 
                for (Db db : dbs) {
-                       if (db.getDbType().isTool() || !db.isEnabled()) {
+                       if (db.isTool() || db.isDruid() || !db.isEnabled()) {
                                continue;
                        }
-                       DbStoreService dbStoreService = findDbStoreService(db);
-                       dbStoreService.saveJsons(effectiveTopic, docs);
+                       DbStoreService dbStoreService = dbService.findDbStoreService(db);
+                       if (dbStoreService != null) {
+                               dbStoreService.saveJsons(effectiveTopic, docs);
+                       }
                }
        }
 
@@ -129,12 +123,6 @@ public class StoreService {
                long timestamp = pair.getLeft();
                String text = pair.getRight();
 
-               //for debug, to be remove
-               //              String topicStr = topic.getId();
-               //              if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
-               //              log.debug("{} ={}", topicStr, text);
-               //}
-
                boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
 
                JSONObject json = null;
@@ -187,29 +175,11 @@ public class StoreService {
                return json;
        }
 
-       private DbStoreService findDbStoreService(Db db) {
-               DbType dbType = db.getDbType();
-               DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
-               switch (dbTypeEnum) {
-               case CB:
-                       return context.getBean(CouchbaseService.class, db);
-               case ES:
-                       return context.getBean(ElasticsearchService.class, db);
-               case HDFS:
-                       return context.getBean(HdfsService.class, db);
-               case MONGO:
-                       return context.getBean(MongodbService.class, db);
-               default:
-                       log.error("we should not come here {}", dbTypeEnum);
-                       return null;
-               }
-       }
-
        public void flush() { //force flush all buffer 
-//             hdfsService.flush();
+               //              hdfsService.flush();
        }
 
        public void flushStall() { //flush stall buffer
-       //      hdfsService.flushStall();
+               //      hdfsService.flushStall();
        }
 }
index 8f703b1..6ca8c13 100644 (file)
@@ -61,14 +61,15 @@ public class TopicConfigPollingService implements Runnable {
        private KafkaRepository kafkaRepository;
        
        //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
-       private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();;
+       private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
        //private Map<String, TopicConfig> effectiveTopicConfigMap;
 
        //monitor Kafka topic list changes
        private Map<String, Set<String>> activeTopicMap;
        
-       private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = new ThreadLocal<>();
-       private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();
+       private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal =   ThreadLocal.withInitial(HashMap::new);//topic name:version
+       private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
+       private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
 
        private boolean active = false;
 
@@ -169,7 +170,11 @@ public class TopicConfigPollingService implements Runnable {
        private Set<String> poll(Kafka kafka) throws IOException {
                log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
 
-               DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
+               DmaapService dmaapService =  dmaapServiceMap.get(kafka.getId());
+               if(dmaapService==null) {
+                       dmaapService = context.getBean(DmaapService.class, kafka);
+                       dmaapServiceMap.put(kafka.getId(), dmaapService);
+               }
                                
                Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
                effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
index 86b27a9..645160e 100644 (file)
@@ -41,7 +41,6 @@ import org.onap.datalake.feeder.service.db.ElasticsearchService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 /**
@@ -58,9 +57,6 @@ public class TopicService {
        @Autowired
        private ApplicationConfiguration config;
 
-       @Autowired
-       private ApplicationContext context;
-
        @Autowired
        private TopicNameRepository topicNameRepository;
 
@@ -70,6 +66,9 @@ public class TopicService {
        @Autowired
        private DbRepository dbRepository;
 
+       @Autowired
+       private DbService dbService;
+       
        public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
 
                List<Topic> topics = findTopics(kafka, topicStr);
@@ -88,7 +87,7 @@ public class TopicService {
                        if (ensureTableExist) {
                                for (Db db : topic.getDbs()) {
                                        if (db.isElasticsearch()) {
-                                               ElasticsearchService elasticsearchService = context.getBean(ElasticsearchService.class, db);
+                                               ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db);                                            
                                                elasticsearchService.ensureTableExist(topicStr);
                                        }
                                }
index 33c8847..bd2d971 100644 (file)
@@ -35,6 +35,7 @@ import org.onap.datalake.feeder.domain.Topic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import com.couchbase.client.java.Bucket;
@@ -56,6 +57,7 @@ import rx.functions.Func1;
  *
  */
 @Service
+@Scope("prototype")
 public class CouchbaseService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -64,17 +66,8 @@ public class CouchbaseService implements DbStoreService {
        ApplicationConfiguration config;
        
        private Db couchbase;
-/*
-       @Autowired
-       private DbService dbService;
-
-       private boolean isReady = false;
-*/
        Bucket bucket;
-
-       public CouchbaseService( ) {
-               
-       }
+       
        public CouchbaseService(Db db) {
                couchbase = db;
        }
index aee63ed..4dfcdd2 100644 (file)
@@ -53,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
@@ -62,6 +63,7 @@ import org.springframework.stereotype.Service;
  *
  */
 @Service
+@Scope("prototype")
 public class ElasticsearchService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -71,15 +73,9 @@ public class ElasticsearchService implements DbStoreService {
        @Autowired
        private ApplicationConfiguration config;
 
-       //@Autowired
-//     private DbService dbService;
-
        private RestHighLevelClient client;
        ActionListener<BulkResponse> listener;
-
-       public ElasticsearchService( ) {
-               
-       }
+       
        public ElasticsearchService(Db db) {
                elasticsearch = db;
        }
@@ -88,7 +84,6 @@ public class ElasticsearchService implements DbStoreService {
        //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
        @PostConstruct
        private void init() {
-               //Db elasticsearch = dbService.getElasticsearch();
                String elasticsearchHost = elasticsearch.getHost();
 
                // Initialize the Connection
index 0e107fd..ea0e77a 100644 (file)
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import lombok.Getter;
@@ -59,6 +60,7 @@ import lombok.Setter;
  *
  */
 @Service
+@Scope("prototype")
 public class HdfsService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -152,9 +154,6 @@ public class HdfsService implements DbStoreService {
                }
        }
 
-       public HdfsService( ) { 
-       }
-
        public HdfsService(Db db) {
                hdfs = db;
        }
index 0f522f6..5cc4070 100644 (file)
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import com.mongodb.bulk.BulkWriteError;
@@ -59,6 +60,7 @@ import com.mongodb.client.model.InsertManyOptions;
  *
  */
 @Service
+@Scope("prototype")
 public class MongodbService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -77,8 +79,6 @@ public class MongodbService implements DbStoreService {
        private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
        private InsertManyOptions insertManyOptions;
 
-       public MongodbService( ) { 
-       }
        public MongodbService(Db db) {
                mongodb = db;
        }