supports multiple Kafka clusters and DBs 35/91135/1
authorGuobiao Mo <guobiaomo@chinamobile.com>
Wed, 10 Jul 2019 08:34:20 +0000 (01:34 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Wed, 10 Jul 2019 08:34:20 +0000 (01:34 -0700)
Change kafka table id to int

Issue-ID: DCAEGEN2-1631

Change-Id: Ib5109b75a387f76709dc38161bf2d7ef084950ef
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
21 files changed:
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java

index 02f2343..1fd9aa8 100644 (file)
@@ -32,7 +32,7 @@ CREATE TABLE `db` (
   PRIMARY KEY (`id`),\r
   KEY `FK3njadtw43ieph7ftt4kxdhcko` (`db_type_id`),\r
   CONSTRAINT `FK3njadtw43ieph7ftt4kxdhcko` FOREIGN KEY (`db_type_id`) REFERENCES `db_type` (`id`)\r
-) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 CREATE TABLE `portal` (\r
   `name` varchar(255) NOT NULL,\r
@@ -73,10 +73,10 @@ CREATE TABLE `design` (
   KEY `FKabb8e74230glxpaiai4aqsr34` (`topic_name_id`),\r
   CONSTRAINT `FKabb8e74230glxpaiai4aqsr34` FOREIGN KEY (`topic_name_id`) REFERENCES `topic_name` (`id`),\r
   CONSTRAINT `FKo43yi6aputq6kwqqu8eqbspm5` FOREIGN KEY (`design_type_id`) REFERENCES `design_type` (`id`)\r
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 CREATE TABLE `kafka` (\r
-  `id` varchar(255) NOT NULL,\r
+  `id` int(11) NOT NULL AUTO_INCREMENT,\r
   `broker_list` varchar(255) NOT NULL,\r
   `consumer_count` int(11) DEFAULT 3,\r
   `enabled` bit(1) NOT NULL,\r
@@ -130,7 +130,7 @@ CREATE TABLE `map_db_topic` (
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 CREATE TABLE `map_kafka_topic` (\r
-  `kafka_id` varchar(255) NOT NULL,\r
+  `kafka_id` int(11) NOT NULL,\r
   `topic_id` int(11) NOT NULL,\r
   PRIMARY KEY (`topic_id`,`kafka_id`),\r
   KEY `FKtdrme4h7rxfh04u2i2wqu23g5` (`kafka_id`),\r
index 0605e0e..770c68b 100644 (file)
@@ -13,7 +13,7 @@ INSERT INTO datalake.kafka(
   ,timeout_sec\r
   ,zk\r
 ) VALUES (\r
-  'KAFKA_1'\r
+  1\r
   ,'main Kafka cluster' -- name - IN varchar(255)\r
   ,3   -- consumer_count - IN int(11)\r
   ,1   -- enabled - IN bit(1)\r
index 901adf2..8d1bf31 100644 (file)
@@ -87,7 +87,7 @@ public class KafkaController {
     @PutMapping("/{id}")
     @ResponseBody
     @ApiOperation(value="Update a kafka.")
-    public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable String id, HttpServletResponse response) throws IOException {
+    public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException {
 
         if (result.hasErrors()) {
             sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
@@ -116,7 +116,7 @@ public class KafkaController {
     @DeleteMapping("/{id}")
     @ResponseBody
     @ApiOperation(value="delete a kafka.")
-    public void deleteKafka(@PathVariable("id") String id, HttpServletResponse response) throws IOException{
+    public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{
 
         Kafka oldKafka = kafkaService.getKafkaById(id);
         if (oldKafka == null) {
index 1162aed..bb0de4b 100644 (file)
@@ -90,7 +90,7 @@ public class TopicController {
        @GetMapping("/dmaap/{kafkaId}")
        @ResponseBody
        @ApiOperation(value = "List all topic names in DMaaP.")
-       public List<String> listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) {
+       public List<String> listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) {
                Kafka kafka = kafkaRepository.findById(kafkaId).get();
                DmaapService dmaapService = context.getBean(DmaapService.class, kafka); 
                return dmaapService.getTopics();
index 2741c63..7f7b59e 100644 (file)
@@ -24,6 +24,8 @@ import java.util.Set;
 import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.JoinTable;
@@ -47,8 +49,9 @@ import org.onap.datalake.feeder.dto.KafkaConfig;
 @Table(name = "kafka")
 public class Kafka {
        @Id
-       @Column(name="`id`")
-       private String id;
+    @GeneratedValue(strategy = GenerationType.IDENTITY)
+    @Column(name = "`id`")
+    private int id;
        
        @Column(name="`name`", nullable = false)
        private String name;
@@ -117,12 +120,12 @@ public class Kafka {
                if (this.getClass() != obj.getClass())
                        return false;
 
-               return id.equals(((Kafka) obj).getId());
+               return id == ((Kafka) obj).getId();
        }
 
        @Override
        public int hashCode() {
-               return id.hashCode();
+               return id;
        }
 
        public KafkaConfig getKafkaConfig() {
index 13e0163..5d0c762 100644 (file)
@@ -213,13 +213,13 @@ public class Topic {
                tConfig.setEnabledSinkdbs(enabledDbList);
 
                Set<Kafka> topicKafka = getKafkas();
-               List<String> kafkaList = new ArrayList<>();
+               List<Integer> kafkaList = new ArrayList<>();
                if (topicKafka != null) {
                        for (Kafka kafka : topicKafka) {
                                kafkaList.add(kafka.getId());
                        }
                }
-               tConfig.setSinkKafkas(kafkaList);
+               tConfig.setKafkas(kafkaList);
                return tConfig;
        }
 
index b158d16..f5e9539 100644 (file)
@@ -23,8 +23,6 @@ package org.onap.datalake.feeder.dto;
 import lombok.Getter;
 import lombok.Setter;
 
-import java.util.List;
-
 /**
  * JSON request body for Kafka Config.
  *
@@ -35,7 +33,7 @@ import java.util.List;
 @Setter
 public class KafkaConfig {
 
-    private String id;
+    private int id;
 
     private String name;
 
index a51103b..6a262ca 100644 (file)
@@ -25,10 +25,6 @@ import lombok.Setter;
 
 import java.util.List;
 
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONObject;
-import org.onap.datalake.feeder.enumeration.DataFormat;
-
 /**
  * JSON request body for Topic manipulation.
  *
@@ -55,7 +51,7 @@ public class TopicConfig {
        private String messageIdPath;
        private String aggregateArrayPath;
        private String flattenArrayPath;
-       private List<String> sinkKafkas;
+       private List<Integer> kafkas;
        
        @Override
        public String toString() {
index 05d76d5..39d02d3 100644 (file)
 */
 package org.onap.datalake.feeder.enumeration;
 
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+
 /**
  * Database type
  * 
@@ -26,12 +32,23 @@ package org.onap.datalake.feeder.enumeration;
  *
  */
 public enum DbTypeEnum { 
-       CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset");
+       CB("Couchbase", CouchbaseService.class)
+       , DRUID("Druid", null)
+       , ES("Elasticsearch", ElasticsearchService.class)
+       , HDFS("HDFS", HdfsService.class)
+       , MONGO("MongoDB", MongodbService.class)
+       , KIBANA("Kibana", null)
+       , SUPERSET("Superset", null);
 
        private final String name;
+       private final Class<? extends DbStoreService> serviceClass;
 
-       DbTypeEnum(String name) {
+       DbTypeEnum(String name, Class<? extends DbStoreService> serviceClass) {
                this.name = name;
+               this.serviceClass = serviceClass;
        }
 
+       public Class<? extends DbStoreService> getServiceClass(){
+               return serviceClass;
+       }
 }
index 8e78e5c..6ce23ba 100644 (file)
@@ -30,6 +30,6 @@ import org.springframework.data.repository.CrudRepository;
  *\r
  */ \r
 \r
-public interface KafkaRepository extends CrudRepository<Kafka, String> {\r
+public interface KafkaRepository extends CrudRepository<Kafka, Integer> {\r
 \r
 }\r
index addd060..d54bf3f 100644 (file)
@@ -26,11 +26,7 @@ import java.util.Map;
 import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.DbType;
 import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
 import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -53,34 +49,24 @@ public class DbService {
        private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
 
        public DbStoreService findDbStoreService(Db db) {
-               DbStoreService ret = dbStoreServiceMap.get(db.getId());
-               if (ret != null) {
-                       return ret;
+               int dbId = db.getId();
+               if (dbStoreServiceMap.containsKey(dbId)) {
+                       return dbStoreServiceMap.get(dbId);
                }
 
                DbType dbType = db.getDbType();
                DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
-               switch (dbTypeEnum) {
-               case CB:
-                       ret = context.getBean(CouchbaseService.class, db);
-                       break;
-               case ES:
-                       ret = context.getBean(ElasticsearchService.class, db);
-                       break;
-               case HDFS:
-                       ret = context.getBean(HdfsService.class, db);
-                       break;
-               case MONGO:
-                       ret = context.getBean(MongodbService.class, db);
-                       break;
-               default:
+               Class<? extends DbStoreService> serviceClass = dbTypeEnum.getServiceClass();
+               
+               if (serviceClass == null) {
                        log.error("Should not have come here {}", db);
-                       ret = null;
+                       dbStoreServiceMap.put(dbId, null);
+                       return null;
                }
-
-               dbStoreServiceMap.put(db.getId(), ret);
+               
+               DbStoreService ret = context.getBean(serviceClass, db);
+               dbStoreServiceMap.put(dbId, ret);
 
                return ret;
        }
-
 }
index 58ee908..2e959fa 100644 (file)
@@ -39,7 +39,7 @@ public class KafkaService {
     @Autowired
     private KafkaRepository kafkaRepository;
 
-    public Kafka getKafkaById(String id) {
+    public Kafka getKafkaById(int id) {
 
         Optional<Kafka> ret = kafkaRepository.findById(id);
         return ret.isPresent() ? ret.get() : null;
index 151ea3d..ab99ad0 100644 (file)
@@ -68,6 +68,7 @@ public class Puller implements Runnable {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
+       //KafkaConsumer is not thread-safe.
        private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text 
 
        private boolean active = false;
@@ -156,7 +157,7 @@ public class Puller implements Runnable {
                                storeService.saveMessages(kafka, partition.topic(), messages);
                                log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
 
-                               if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+                               if (!async) {//for reliability, sync commit offset to Kafka right after saving the data to data store, this slows down a bit
                                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                                }
index 6ca8c13..a02cd6a 100644 (file)
@@ -61,15 +61,15 @@ public class TopicConfigPollingService implements Runnable {
        private KafkaRepository kafkaRepository;
        
        //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
-       private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
+       private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
        //private Map<String, TopicConfig> effectiveTopicConfigMap;
 
-       //monitor Kafka topic list changes
-       private Map<String, Set<String>> activeTopicMap;
+       //monitor Kafka topic list changes, key is kafka id, value is active Topics
+       private Map<Integer, Set<String>> activeTopicMap;
        
-       private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal =   ThreadLocal.withInitial(HashMap::new);//topic name:version
-       private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
-       private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
+       private ThreadLocal<Map<Integer, Integer>> activeTopicsVersionLocal =   ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version
+       private Map<Integer, Integer> currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version
+       private Map<Integer, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
 
        private boolean active = false;
 
@@ -84,7 +84,7 @@ public class TopicConfigPollingService implements Runnable {
        }
 
        public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
-               String kafkaId = kafka.getId();
+               int kafkaId = kafka.getId();
                int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
                int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
                
@@ -125,10 +125,10 @@ public class TopicConfigPollingService implements Runnable {
                        }
 
                        try {
-                               Map<String, Set<String>> newTopicsMap = poll();
+                               Map<Integer, Set<String>> newTopicsMap = poll();
                                
-                               for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) {
-                                       String kafkaId = entry.getKey();
+                               for(Map.Entry<Integer, Set<String>> entry:newTopicsMap.entrySet()) {
+                                       Integer kafkaId = entry.getKey();
                                        Set<String>  newTopics = entry.getValue();
                                        
                                        Set<String> activeTopics = activeTopicMap.get(kafkaId);
@@ -155,8 +155,8 @@ public class TopicConfigPollingService implements Runnable {
                active = false;
        }
 
-       private Map<String, Set<String>>  poll() throws IOException {
-               Map<String, Set<String>> ret = new HashMap<>();
+       private Map<Integer, Set<String>>  poll() throws IOException {
+               Map<Integer, Set<String>> ret = new HashMap<>();
                Iterable<Kafka> kafkas = kafkaRepository.findAll();
                for (Kafka kafka : kafkas) {
                        if (kafka.isEnabled()) {
index 73f6293..ed9b5c2 100644 (file)
@@ -172,8 +172,8 @@ public class TopicService {
                }
 
                Set<Kafka> relateKafka = new HashSet<>();
-               if (tConfig.getSinkKafkas() != null) {
-                       for (String item : tConfig.getSinkKafkas()) {
+               if (tConfig.getKafkas() != null) {
+                       for (int item : tConfig.getKafkas()) {
                                Optional<Kafka> sinkKafka = kafkaRepository.findById(item);
                                if (sinkKafka.isPresent()) {
                                        relateKafka.add(sinkKafka.get());
index f2ac5e9..44b940a 100644 (file)
@@ -66,6 +66,7 @@ public class CouchbaseService implements DbStoreService {
        ApplicationConfiguration config;
        
        private Db couchbase;
+       //Bucket is thread-safe. https://docs.couchbase.com/java-sdk/current/managing-connections.html
        Bucket bucket;
        
        public CouchbaseService(Db db) {
index 18b7e2f..e303fa9 100644 (file)
@@ -73,7 +73,7 @@ public class ElasticsearchService implements DbStoreService {
        @Autowired
        private ApplicationConfiguration config;
 
-       private RestHighLevelClient client;
+       private RestHighLevelClient client;//thread safe
        ActionListener<BulkResponse> listener;
        
        public ElasticsearchService(Db db) {
index a044790..8677c6f 100644 (file)
@@ -72,11 +72,9 @@ public class MongodbService implements DbStoreService {
        private ApplicationConfiguration config;
        private boolean dbReady = false;
 
-       //@Autowired
-//     private DbService dbService;
-
        private MongoDatabase database;
        private MongoClient mongoClient;
+       //MongoCollection is ThreadSafe
        private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
        private InsertManyOptions insertManyOptions;
 
index 04545a9..bd26519 100644 (file)
@@ -69,8 +69,8 @@ public class TopicConfigPollingServiceTest {
                init.invoke(topicConfigPollingService);
 
                Set<String> activeTopics = new HashSet<>(Arrays.asList("test"));
-               Map<String, Set<String>> activeTopicMap = new HashMap<>();
-               activeTopicMap.put(KAFKA_NAME, activeTopics);
+               Map<Integer, Set<String>> activeTopicMap = new HashMap<>();
+               activeTopicMap.put(1, activeTopics);
 
                Field activeTopicsField = TopicConfigPollingService.class.getDeclaredField("activeTopicMap");
                activeTopicsField.setAccessible(true);
@@ -114,6 +114,7 @@ public class TopicConfigPollingServiceTest {
        @Test
        public void testGet() {
                Kafka kafka = TestUtil.newKafka(KAFKA_NAME);
+               kafka.setId(1);
                //assertNull(topicConfigPollingService.getEffectiveTopic (kafka, "test"));
                assertNotNull(topicConfigPollingService.getActiveTopics(kafka));
 
index 2ea2e83..4eebcb4 100644 (file)
@@ -99,7 +99,7 @@ public class TopicServiceTest {
                db.setDbType(dbType);
 
                Kafka kafka = new Kafka();
-               kafka.setId("1234");
+               kafka.setName("1234");
                kafkas.add(kafka);
 
                TopicName topicName = new TopicName();
index a54cfd3..770cf31 100644 (file)
@@ -45,7 +45,8 @@ public class TestUtil {
 
     public static Kafka newKafka(String name) {
        Kafka kafka  = new Kafka(); 
-       kafka.setId(name); 
+       kafka.setId(i++);
+       kafka.setName(name); 
        return kafka ;
     }