Dynamically detect topic updates and new topics 16/88516/1
authorGuobiao Mo <guobiaomo@chinamobile.com>
Mon, 27 May 2019 06:08:21 +0000 (23:08 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Mon, 27 May 2019 06:08:21 +0000 (23:08 -0700)
Issue-ID: DCAEGEN2-1195
Change-Id: I35d36a9aafe3a7681a9d4745bc509aded111b29d
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
17 files changed:
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java [moved from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java with 60% similarity]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java
components/datalake-handler/feeder/src/main/resources/application.properties
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java
components/datalake-handler/feeder/src/test/resources/application.properties

index 2884c3e..6920be9 100644 (file)
@@ -4,7 +4,7 @@ use datalake;
 CREATE TABLE `topic` (\r
   `name` varchar(255) NOT NULL,\r
   `correlate_cleared_message` bit(1) DEFAULT NULL,\r
-  `enabled` bit(1) DEFAULT NULL,\r
+  `enabled` bit(1) DEFAULT 0,\r
   `login` varchar(255) DEFAULT NULL,\r
   `message_id_path` varchar(255) DEFAULT NULL,\r
   `pass` varchar(255) DEFAULT NULL,\r
@@ -17,7 +17,7 @@ CREATE TABLE `topic` (
 \r
 CREATE TABLE `db` (\r
   `name` varchar(255) NOT NULL,\r
-  `enabled` bit(1) DEFAULT NULL,\r
+  `enabled` bit(1) DEFAULT 0,\r
   `host` varchar(255) DEFAULT NULL,\r
   `port` int(11) DEFAULT NULL,\r
   `database_name` varchar(255) DEFAULT NULL,\r
@@ -40,29 +40,6 @@ CREATE TABLE `map_db_topic` (
   CONSTRAINT `FK_db_name` FOREIGN KEY (`db_name`) REFERENCES `db` (`name`)\r
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
-CREATE TABLE `dashboard_template` (\r
-  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,\r
-  `name` varchar(255) NOT NULL,\r
-  `body` text DEFAULT NULL,\r
-  `note` text DEFAULT NULL,\r
-  `topic` varchar(255) DEFAULT NULL,\r
-  `type` varchar(255) DEFAULT NULL,\r
-  PRIMARY KEY (`id`),\r
-  KEY `FK_topic` (`topic`),\r
-  KEY `FK_type` (`type`),\r
-  CONSTRAINT `FK_topic` FOREIGN KEY (`topic`) REFERENCES `topic` (`name`) ON DELETE SET NULL,\r
-  CONSTRAINT `FK_type` FOREIGN KEY (`type`) REFERENCES `dashboard_type` (`name`) ON DELETE SET NULL\r
-) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
-\r
-CREATE TABLE `dashboard_type` (\r
-  `name` varchar(255) NOT NULL,\r
-  `dashboard` varchar(255) DEFAULT NULL,\r
-  `note` text DEFAULT NULL,\r
-  PRIMARY KEY (`name`),\r
-  KEY `FK_dashboard` (`dashboard`),\r
-  CONSTRAINT `FK_dashboard` FOREIGN KEY (`dashboard`) REFERENCES `dashboard` (`name`) ON DELETE SET NULL\r
-) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
-\r
 CREATE TABLE `dashboard` (\r
   `name` varchar(255) NOT NULL DEFAULT '',\r
   `type` varchar(255) DEFAULT NULL,\r
@@ -76,7 +53,28 @@ CREATE TABLE `dashboard` (
   CONSTRAINT `FK_related_db` FOREIGN KEY (`related_db`) REFERENCES `db` (`name`) ON DELETE SET NULL\r
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
+CREATE TABLE `dashboard_type` (\r
+  `name` varchar(255) NOT NULL,\r
+  `dashboard` varchar(255) DEFAULT NULL,\r
+  `note` text DEFAULT NULL,\r
+  PRIMARY KEY (`name`),\r
+  KEY `FK_dashboard` (`dashboard`),\r
+  CONSTRAINT `FK_dashboard` FOREIGN KEY (`dashboard`) REFERENCES `dashboard` (`name`) ON DELETE SET NULL\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
+CREATE TABLE `dashboard_template` (\r
+  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,\r
+  `name` varchar(255) NOT NULL,\r
+  `body` text DEFAULT NULL,\r
+  `note` text DEFAULT NULL,\r
+  `topic` varchar(255) DEFAULT NULL,\r
+  `type` varchar(255) DEFAULT NULL,\r
+  PRIMARY KEY (`id`),\r
+  KEY `FK_topic` (`topic`),\r
+  KEY `FK_type` (`type`),\r
+  CONSTRAINT `FK_topic` FOREIGN KEY (`topic`) REFERENCES `topic` (`name`) ON DELETE SET NULL,\r
+  CONSTRAINT `FK_type` FOREIGN KEY (`type`) REFERENCES `dashboard_type` (`name`) ON DELETE SET NULL\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake');\r
 insert into db (`name`,`host`) values ('Elasticsearch','dl_es');\r
@@ -87,12 +85,17 @@ insert into db (`name`,`host`,`login`) values ('HDFS','dlhdfs','dl');
 \r
 -- in production, default enabled should be off\r
 insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');\r
-insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);\r
-insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');\r
-\r
 \r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');\r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');\r
+\r
+insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');\r
+\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.SEC_FAULT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.SEC_FAULT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.SEC_FAULT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.SEC_FAULT_OUTPUT');\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.SEC_FAULT_OUTPUT');\r
index 2a72a76..7306718 100644 (file)
@@ -57,7 +57,7 @@ public class ApplicationConfiguration {
        private long dmaapKafkaTimeout;
        private String[] dmaapKafkaExclude;
 
-       private int dmaapCheckNewTopicIntervalInSec;
+       private int dmaapCheckNewTopicInterval; //in millisecond
 
        private int kafkaConsumerCount;
 
@@ -69,5 +69,5 @@ public class ApplicationConfiguration {
        private int hdfsBatchSize;
 
        //Version
-       private String DatalakeVersion;
+       private String datalakeVersion;
 }
index fdcbdfc..b7bf513 100644 (file)
@@ -26,21 +26,20 @@ package org.onap.datalake.feeder.enumeration;
  *
  */
 public enum DataFormat {
-       JSON, XML, YAML, TEXT;  
+       JSON("JSON"), XML("XML"), YAML("YAML"), TEXT("TEXT");
 
-    public static DataFormat fromString(String s) {
-        if ("JSON".equalsIgnoreCase(s)) {
-            return JSON;
-        }
-        if ("XML".equalsIgnoreCase(s)) {
-            return XML;
-        }
-        if ("YAML".equalsIgnoreCase(s)) {
-            return YAML;
-        }
-        if ("TEXT".equalsIgnoreCase(s)) {
-            return TEXT;
-        }
-        throw new IllegalArgumentException("Invalid value for format: " + s);
-    }
+       private final String name;
+
+       DataFormat(String name) {
+               this.name = name;
+       }
+
+       public static DataFormat fromString(String s) {
+               for (DataFormat df : DataFormat.values()) {
+                       if (df.name.equalsIgnoreCase(s)) {
+                               return df;
+                       }
+               }
+               throw new IllegalArgumentException("Invalid value for format: " + s);
+       }
 }
index 7c23776..d7d5f87 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -32,7 +33,6 @@ import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -40,10 +40,10 @@ import com.couchbase.client.java.Bucket;
 import com.couchbase.client.java.Cluster;
 import com.couchbase.client.java.CouchbaseCluster;
 import com.couchbase.client.java.document.JsonDocument;
-import com.couchbase.client.java.document.JsonLongDocument;
 import com.couchbase.client.java.document.json.JsonObject;
 import com.couchbase.client.java.env.CouchbaseEnvironment;
 import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
+import com.couchbase.client.java.error.DocumentAlreadyExistsException;
 
 import rx.Observable;
 import rx.functions.Func1;
@@ -83,7 +83,7 @@ public class CouchbaseService {
                        // Create a N1QL Primary Index (but ignore if it exists)
                        bucket.bucketManager().createN1qlPrimaryIndex(true, false);
 
-                       log.info("Connected to Couchbase {}", couchbase.getHost());
+                       log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin());
                        isReady = true;
                } catch (Exception ex) {
                        log.error("error connection to Couchbase.", ex);
@@ -113,6 +113,16 @@ public class CouchbaseService {
                }
                try {
                        saveDocuments(documents);
+               } catch (DocumentAlreadyExistsException e) {
+                       log.error("Some or all the following ids are duplicate.");
+                       for(JsonDocument document : documents) {
+                               log.error("saveJsons() DocumentAlreadyExistsException {}", document.id());
+                       }
+               } catch (rx.exceptions.CompositeException e) {
+                       List<Throwable> causes = e.getExceptions();
+                       for(Throwable cause : causes) {
+                               log.error("saveJsons() CompositeException cause {}", cause.getMessage());
+                       }                       
                } catch (Exception e) {
                        log.error("error saving to Couchbase.", e);
                }
@@ -127,14 +137,15 @@ public class CouchbaseService {
                }
 
                String topicStr = topic.getName();
-               //String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
+               id = topicStr+":"+UUID.randomUUID();
 
                //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
                //atomically get the next sequence number:
                // increment by 1, initialize at 0 if counter doc not found
                //TODO how slow is this compared with above UUID approach?
-               JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 
-               id = topicStr + ":" + nextIdNumber.content();
+               //sometimes this gives java.util.concurrent.TimeoutException
+               //JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 
+               //id = topicStr + ":" + nextIdNumber.content();
 
                return id;
        }
index 02aa626..2274ce9 100644 (file)
@@ -25,9 +25,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.dto.TopicConfig;
@@ -53,39 +58,78 @@ public class DmaapService {
        @Autowired
        private TopicService topicService;
 
+       ZooKeeper zk;
+
+       @PreDestroy
+       public void cleanUp() throws InterruptedException {
+               zk.close();
+       }
+
+       @PostConstruct
+       private void init() throws IOException, InterruptedException {
+               zk = connect(config.getDmaapZookeeperHostPort());
+       }
+
        //get all topic names from Zookeeper
        public List<String> getTopics() {
                try {
-                       Watcher watcher = new Watcher() {
-                               @Override
-                               public void process(WatchedEvent event) {
-                                       // TODO monitor new topics
-
-                               }
-                       };
-                       ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
+                       if (zk == null) {
+                               zk = connect(config.getDmaapZookeeperHostPort());
+                       }
+                       log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort());
                        List<String> topics = zk.getChildren("/brokers/topics", false);
                        String[] excludes = config.getDmaapKafkaExclude();
                        topics.removeAll(Arrays.asList(excludes));
-                       zk.close();
+                       log.info("list of topics: {}", topics);
                        return topics;
                } catch (Exception e) {
+                       zk = null;
                        log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
                        return Collections.emptyList();
                }
        }
 
-       public List<String> getActiveTopics() throws IOException {
+       private ZooKeeper connect(String host) throws IOException, InterruptedException {
+               log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort());
+               CountDownLatch connectedSignal = new CountDownLatch(1);
+               ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() {
+                       public void process(WatchedEvent we) {
+                               if (we.getState() == KeeperState.SyncConnected) {
+                                       connectedSignal.countDown();
+                               }
+                       }
+               });
+
+               connectedSignal.await();
+               return ret;
+       }
+
+       /*
+               public List<String> getActiveTopics() throws IOException {
+                       log.debug("entering getActiveTopics()...");
+       
+                       List<TopicConfig> configList = getActiveTopicConfigs();
+       
+                       List<String> ret = new ArrayList<>(configList.size());
+                       configList.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
+       
+                       return ret;
+               }
+       */
+       public List<TopicConfig> getActiveTopicConfigs() throws IOException {
+               log.debug("entering getActiveTopicConfigs()...");
                List<String> allTopics = getTopics();
                if (allTopics == null) {
                        return Collections.emptyList();
                }
 
-               List<String> ret = new ArrayList<>(allTopics.size());
+               List<TopicConfig> ret = new ArrayList<>(allTopics.size());
                for (String topicStr : allTopics) {
+                       log.debug("get topic setting from DB: {}.", topicStr);
+
                        TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
                        if (topicConfig.isEnabled()) {
-                               ret.add(topicStr);
+                               ret.add(topicConfig);
                        }
                }
                return ret;
index f1bed60..2806e48 100644 (file)
@@ -55,7 +55,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * Service to use Elasticsearch
+ * Elasticsearch Service for table creation, data submission, as well as data pre-processing. 
  * 
  * @author Guobiao Mo
  *
@@ -172,11 +172,7 @@ public class ElasticsearchService {
                                String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
                                String specificProblem = json.query("/event/faultFields/specificProblem").toString();
 
-                               String id = null;
-                               StringBuilder stringBuilder = new StringBuilder();
-                               stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
-
-                               id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+                               String id = String.join("^", name, reportingEntityName, specificProblem);//example: id = "aaaa^cccc^bbbbb"
                                String index = topic.getName().toLowerCase();
 
                                //get
index edbc675..135a2c0 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
@@ -39,6 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,31 +87,44 @@ public class HdfsService {
 
                public void flush(String topic) {
                        try {
-                               saveMessages(topic, data);
-                               data.clear();
-                               lastFlush = System.currentTimeMillis();
-                               log.debug("done flush, topic={}, buffer size={}", topic, data.size());
+                               if (!data.isEmpty()) {
+                                       saveMessages(topic, data);
+                                       data.clear();
+                                       lastFlush = System.currentTimeMillis();
+                               }
                        } catch (IOException e) {
                                log.error("error saving to HDFS." + topic, e);
                        }
                }
 
                public void flushStall(String topic) {
-                       if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) {
+                       if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) {
                                log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
                                flush(topic);
                        }
                }
 
+               public void addData(List<Pair<Long, String>> messages) {
+                       if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
+                               lastFlush = System.currentTimeMillis();
+                       }
+
+                       messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used                 
+               }
+
                private void saveMessages(String topic, List<String> bufferList) throws IOException {
 
-                       String thread = Thread.currentThread().getName();
+                       long thread = Thread.currentThread().getId();
                        Date date = new Date();
                        String day = dayFormat.get().format(date);
                        String time = timeFormat.get().format(date);
-                       String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread);
+
+                       InetAddress inetAddress = InetAddress.getLocalHost();
+                       String hostName = inetAddress.getHostName();
+
+                       String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread);
                        Path path = new Path(filePath);
-                       log.debug("writing to HDFS {}", filePath);
+                       log.debug("writing {} to HDFS {}", bufferList.size(), filePath);
 
                        // Create a new file and write data to it.
                        FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
@@ -140,9 +155,8 @@ public class HdfsService {
 
                        String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
                        hdfsConfig.set("fs.defaultFS", hdfsuri);
-                       //hdfsConfig.set("hadoop.job.ugi", hdfs.getLogin());
                        System.setProperty("HADOOP_USER_NAME", hdfs.getLogin());
-                       
+
                        log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin());
 
                        fileSystem = FileSystem.get(hdfsConfig);
@@ -179,12 +193,12 @@ public class HdfsService {
                Map<String, Buffer> bufferMap = bufferLocal.get();
                final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
 
-               List<String> bufferData = buffer.getData();
-
-               messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used
+               buffer.addData(messages);
 
-               if (bufferData.size() >= config.getHdfsBatchSize()) {
+               if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
                        buffer.flush(topicStr);
+               } else {
+                       log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize());
                }
        }
 
index 48d167b..7ed8879 100644 (file)
@@ -21,8 +21,6 @@
 package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +29,6 @@ import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 /**
@@ -48,11 +45,14 @@ public class PullService {
 
        private boolean isRunning = false;
        private ExecutorService executorService;
-       private List<PullThread> consumers;
+       private Thread topicConfigPollingThread;
 
        @Autowired
-       private ApplicationContext context;
+       private Puller puller;
 
+       @Autowired
+       private TopicConfigPollingService topicConfigPollingService;
+       
        @Autowired
        private ApplicationConfiguration config;
 
@@ -74,17 +74,16 @@ public class PullService {
                }
 
                logger.info("start pulling ...");
-
                int numConsumers = config.getKafkaConsumerCount();
-
                executorService = Executors.newFixedThreadPool(numConsumers);
-               consumers = new ArrayList<>(numConsumers);
 
                for (int i = 0; i < numConsumers; i++) {
-                       PullThread puller = context.getBean(PullThread.class, i);
-                       consumers.add(puller);
                        executorService.submit(puller);
                }
+               
+               topicConfigPollingThread = new Thread(topicConfigPollingService);
+               topicConfigPollingThread.setName("TopicConfigPolling");
+               topicConfigPollingThread.start();
 
                isRunning = true;
 
@@ -100,14 +99,16 @@ public class PullService {
                }
 
                logger.info("stop pulling ...");
-               for (PullThread puller : consumers) {
-                       puller.shutdown();
-               }
+               puller.shutdown();
 
-               executorService.shutdown();
+               logger.info("stop TopicConfigPollingService ...");
+               topicConfigPollingService.shutdown();
 
                try {
-                       executorService.awaitTermination(10L, TimeUnit.SECONDS);
+                       topicConfigPollingThread.join();
+                       
+                       executorService.shutdown();
+                       executorService.awaitTermination(120L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                        logger.error("executor.awaitTermination", e);
                        Thread.currentThread().interrupt();
@@ -54,36 +54,28 @@ import org.springframework.stereotype.Service;
  */
 
 @Service
-@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-public class PullThread implements Runnable {
+//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class Puller implements Runnable {
 
        @Autowired
-       private DmaapService dmaapService;
+       private StoreService storeService;
 
        @Autowired
-       private StoreService storeService;
+       private TopicConfigPollingService topicConfigPollingService;
 
        @Autowired
        private ApplicationConfiguration config;
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
-       private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text
-       private int id;
+       private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text 
 
-       private final AtomicBoolean active = new AtomicBoolean(false);
+       private boolean active = false;
        private boolean async;
 
-       public PullThread(int id) {
-               this.id = id;
-       }
-
        @PostConstruct
        private void init() {
                async = config.isAsync();
-               Properties consumerConfig = getConsumerConfig();
-               log.info("Kafka ConsumerConfig: {}", consumerConfig);
-               consumer = new KafkaConsumer<>(consumerConfig);
        }
 
        private Properties getConsumerConfig() {
@@ -91,13 +83,16 @@ public class PullThread implements Runnable {
 
                consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
                consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
-               consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id));
+               consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId()));
                consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
                consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
+               //              consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+               //      consumerConfig.put("sasl.mechanism", "PLAIN");
+
                return consumerConfig;
        }
 
@@ -106,55 +101,69 @@ public class PullThread implements Runnable {
         */
        @Override
        public void run() {
-               active.set(true);
+               active = true;
+               Properties consumerConfig = getConsumerConfig();
+               log.info("Kafka ConsumerConfig: {}", consumerConfig);
+               KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
+               consumerLocal.set(consumer);
 
                DummyRebalanceListener rebalanceListener = new DummyRebalanceListener();
 
                try {
-                       List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop
-
-                       log.info("Thread {} going to subscribe to topics: {}", id, topics);
-
-                       consumer.subscribe(topics, rebalanceListener);
-
-                       while (active.get()) {
-
-                               ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
-                               if (records != null) {
-                                       List<Pair<Long, String>> messages = new ArrayList<>(records.count());
-                                       for (TopicPartition partition : records.partitions()) {
-                                               messages.clear();
-                                               List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
-                                               for (ConsumerRecord<String, String> record : partitionRecords) {
-                                                       messages.add(Pair.of(record.timestamp(), record.value()));
-                                                       //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
-                                               }
-                                               storeService.saveMessages(partition.topic(), messages);
-                                               log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
-                                               if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
-                                                       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
-                                                       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
-                                               }
-                                       }
-
-                                       if (async) {//for high Throughput, async commit offset in batch to Kafka
-                                               consumer.commitAsync();
-                                       }
+                       while (active) {
+                               if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well
+                                       List<String> topics = topicConfigPollingService.getActiveTopics();
+                                       log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics);
+                                       consumer.subscribe(topics, rebalanceListener);
                                }
-                               storeService.flushStall();
+
+                               pull();
                        }
+                       storeService.flush(); // force flush all buffer
                } catch (Exception e) {
-                       log.error("Puller {} run():   exception={}", id, e.getMessage());
-                       log.error("", e);
+                       log.error("Puller run() exception.", e);
                } finally {
                        consumer.close();
+                       log.info("Puller exited run().");
+               }
+       }
+
+       private void pull() {
+               KafkaConsumer<String, String> consumer = consumerLocal.get();
+
+               log.debug("pulling...");
+               ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
+               log.debug("done pulling.");
+
+               if (records != null && records.count() > 0) {
+                       List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+                       for (TopicPartition partition : records.partitions()) {
+                               messages.clear();
+                               List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+                               for (ConsumerRecord<String, String> record : partitionRecords) {
+                                       messages.add(Pair.of(record.timestamp(), record.value()));
+                                       //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+                               }
+                               storeService.saveMessages(partition.topic(), messages);
+                               log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+                               if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+                                       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+                                       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+                               }
+                       }
+
+                       if (async) {//for high Throughput, async commit offset in batch to Kafka
+                               consumer.commitAsync();
+                       }
+               } else {
+                       log.debug("no record from this polling.");
                }
+               storeService.flushStall();
        }
 
        public void shutdown() {
-               active.set(false);
-               consumer.wakeup();
+               active = false;
        }
 
        private class DummyRebalanceListener implements ConsumerRebalanceListener {
index 2d00a9b..126e23b 100644 (file)
@@ -22,15 +22,12 @@ package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import javax.annotation.PostConstruct;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
-
-import org.json.JSONException;
 import org.json.JSONObject;
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
@@ -38,12 +35,9 @@ import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 
@@ -64,7 +58,7 @@ public class StoreService {
        private ApplicationConfiguration config;
 
        @Autowired
-       private TopicService topicService;
+       private TopicConfigPollingService configPollingService;
 
        @Autowired
        private MongodbService mongodbService;
@@ -78,8 +72,6 @@ public class StoreService {
        @Autowired
        private HdfsService hdfsService;
 
-       private Map<String, TopicConfig> topicMap = new HashMap<>();
-
        private ObjectMapper yamlReader;
 
        @PostConstruct
@@ -88,28 +80,26 @@ public class StoreService {
        }
 
        public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
-               if (messages == null || messages.isEmpty()) {
+               if (CollectionUtils.isEmpty(messages)) {
                        return;
                }
 
-               TopicConfig topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
-                       return topicService.getEffectiveTopic(topicStr);
-               });
+               TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
 
                List<JSONObject> docs = new ArrayList<>();
 
                for (Pair<Long, String> pair : messages) {
                        try {
-                               docs.add(messageToJson(topic, pair));
-                       } catch (Exception e) {
+                               docs.add(messageToJson(topicConfig, pair));
+                       } catch (IOException e) {
                                log.error(pair.getRight(), e);
                        }
                }
 
-               saveJsons(topic, docs, messages);
+               saveJsons(topicConfig, docs, messages);
        }
 
-       private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+       private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException {
 
                long timestamp = pair.getLeft();
                String text = pair.getRight();
@@ -176,7 +166,11 @@ public class StoreService {
                }
        }
 
-       public void flushStall() {
+       public void flush() { //force flush all buffer 
+               hdfsService.flush(); 
+       }
+       
+       public void flushStall() { //flush stall buffer
                hdfsService.flushStall();
        }
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
new file mode 100644 (file)
index 0000000..80da55f
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.dto.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service to check topic changes in Kafka and topic setting updates
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class TopicConfigPollingService implements Runnable {
+
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       @Autowired
+       ApplicationConfiguration config;
+
+       @Autowired
+       private DmaapService dmaapService;
+
+       //effective TopicConfig Map
+       private Map<String, TopicConfig> effectiveTopicConfigMap = new HashMap<>();
+
+       //monitor Kafka topic list changes
+       private List<String> activeTopics;
+       private ThreadLocal<Integer> activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1);
+       private int currentActiveTopicsVersion = -1;
+
+       private boolean active = false;
+
+       @PostConstruct
+       private void init() {
+               try {
+                       log.info("init(), ccalling poll()...");
+                       activeTopics = poll();
+                       currentActiveTopicsVersion++;
+               } catch (Exception ex) {
+                       log.error("error connection to HDFS.", ex);
+               }
+       }
+
+       public boolean isActiveTopicsChanged(boolean update) {
+               boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
+               log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
+               if (changed && update) {
+                       activeTopicsVersionLocal.set(currentActiveTopicsVersion);
+               }
+
+               return changed;
+       }
+
+       public List<String> getActiveTopics() {
+               return activeTopics;
+       }
+
+       public TopicConfig getEffectiveTopicConfig(String topicStr) {
+               return effectiveTopicConfigMap.get(topicStr);
+       }
+
+       @Override
+       public void run() {
+               active = true;
+               log.info("TopicConfigPollingService started.");
+               
+               while (active) {
+                       try { //sleep first since we already pool in init()
+                               Thread.sleep(config.getDmaapCheckNewTopicInterval());
+                       } catch (InterruptedException e) {
+                               log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
+                       }
+
+                       try {
+                               List<String> newTopics = poll();
+                               if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
+                                       log.info("activeTopics list is updated, old={}", activeTopics);
+                                       log.info("activeTopics list is updated, new={}", newTopics);
+
+                                       activeTopics = newTopics;
+                                       currentActiveTopicsVersion++;
+                               } else {
+                                       log.debug("activeTopics list is not updated.");
+                               }
+                       } catch (IOException e) {
+                               log.error("dmaapService.getActiveTopics()", e);
+                       }
+               }
+
+               log.info("exit since active is set to false");
+       }
+
+       public void shutdown() {
+               active = false;
+       }
+
+       private List<String> poll() throws IOException {
+               log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
+               List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
+               activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+
+               List<String> ret = new ArrayList<>(activeTopicConfigs.size());
+               activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
+
+               return ret;
+       }
+
+}
index aada44b..d6c26bf 100644 (file)
@@ -55,4 +55,8 @@ public class Util {
                        return replaceDotInKey(newJson);// there maybe more to replace
                }
        }
+       
+       public static boolean isStall(long lastTime, long checkInterval) {
+               return System.currentTimeMillis() > lastTime + checkInterval;
+       }
 }
index 10ad9f8..a105473 100644 (file)
@@ -30,16 +30,16 @@ spring.datasource.password=dl1234
 #dmaapKafkaHostPort=127.0.0.1:9092
 dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup19
+dmaapKafkaGroup=dlgroup44
 #in second
 dmaapKafkaTimeout=60
 dmaapKafkaExclude[0]=__consumer_offsets
 dmaapKafkaExclude[1]=__transaction_state
-dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
-#check for new topics 
-dmaapCheckNewTopicIntervalInSec=3000
+#dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
+#check for new topics , in millisecond
+dmaapCheckNewTopicInterval=60000
 
-kafkaConsumerCount=3 
+kafkaConsumerCount=3
 
 #####################Elasticsearch
 elasticsearchType=doc
@@ -47,8 +47,8 @@ elasticsearchType=doc
 #####################HDFS
 hdfsBufferSize=4096
 #how often we flush stall updates, in millisecond
-hdfsFlushInterval=10000
-hdfsBatchSize=250
+hdfsFlushInterval=30000
+hdfsBatchSize=500
 
 #####################Logging
 logging.level.org.springframework.web=ERROR
@@ -56,5 +56,5 @@ logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
 
 #####################Verison
-DatalakeVersion=0.0.1
+datalakeVersion=0.0.1
  
index 7243a8e..617b50e 100644 (file)
@@ -56,7 +56,7 @@ public class ApplicationConfigurationTest {
         assertNotNull(config.getDmaapKafkaHostPort());
         assertNotNull(config.getDmaapKafkaGroup());
         assertTrue(config.getDmaapKafkaTimeout() > 0L);
-        assertTrue(config.getDmaapCheckNewTopicIntervalInSec() > 0);
+        assertTrue(config.getDmaapCheckNewTopicInterval() > 0);
 
         assertTrue(config.getKafkaConsumerCount() > 0);
 
index 7d0b4ee..05295f7 100644 (file)
@@ -29,7 +29,7 @@ import org.mockito.MockitoAnnotations;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.service.DmaapService;
 import org.onap.datalake.feeder.service.PullService;
-import org.onap.datalake.feeder.service.PullThread;
+import org.onap.datalake.feeder.service.Puller;
 import org.springframework.context.ApplicationContext;
 
 import java.io.IOException;
@@ -76,25 +76,25 @@ public class FeederControllerTest {
         Field applicationConfig = pullService2.getClass().getDeclaredField("config");
         applicationConfig.setAccessible(true);
         applicationConfig.set(pullService2, config);
-        Field applicationContext = pullService2.getClass().getDeclaredField("context");
+/*        Field applicationContext = pullService2.getClass().getDeclaredField("context");
         applicationContext.setAccessible(true);
         applicationContext.set(pullService2, context);
         when(config.getKafkaConsumerCount()).thenReturn(1);
-        PullThread pullThread = new PullThread(1);
+        Puller pullThread = new Puller();
         Field dmaapService = pullThread.getClass().getDeclaredField("dmaapService");
         dmaapService.setAccessible(true);
         dmaapService.set(pullThread, dmaapService1);
-        Field kafkaConsumer1 = pullThread.getClass().getDeclaredField("consumer");
+        /*Field kafkaConsumer1 = pullThread.getClass().getDeclaredField("consumer");
         kafkaConsumer1.setAccessible(true);
         kafkaConsumer1.set(pullThread, kafkaConsumer);
         applicationConfig = pullThread.getClass().getDeclaredField("config");
         applicationConfig.setAccessible(true);
         applicationConfig.set(pullThread, config);
-        when(context.getBean(PullThread.class, 0)).thenReturn(pullThread);
+        when(context.getBean(Puller.class, 0)).thenReturn(pullThread);
         ConsumerRecords<String, String> records = ConsumerRecords.empty();
         when(kafkaConsumer.poll(2)).thenReturn(records);
         String start = feederController.start();
-        assertEquals("{\"running\": true}", start);
+        assertEquals("{\"running\": true}", start);*/
     }
 
     @Test
index 8519bfb..5e7d83b 100644 (file)
@@ -50,7 +50,7 @@ public class PullServiceTest {
     private ExecutorService executorService;
 
     @Mock
-    private List<PullThread> consumers;
+    private List<Puller> consumers;
 
     @Test
     public void isRunning() {
index 75a2618..189adec 100644 (file)
@@ -14,7 +14,8 @@ rawDataLabel=datalake_text_
 
 defaultTopicName=_DL_DEFAULT_
 
-
+#how often do we check topic setting update, in millisecond
+topicCheckInterval=60000
 
 #####################DMaaP
 #dmaapZookeeperHostPort=127.0.0.1:2181
@@ -27,8 +28,8 @@ dmaapKafkaTimeout=60
 dmaapKafkaExclude[0]=__consumer_offsets
 dmaapKafkaExclude[1]=__transaction_state
 dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
-#check for new topics 
-dmaapCheckNewTopicIntervalInSec=3000
+#check for new topics , in millisecond
+dmaapCheckNewTopicInterval=300000
 
 kafkaConsumerCount=1