No data is lost when program is killed 68/90268/2
authorGuobiao Mo <guobiaomo@chinamobile.com>
Thu, 20 Jun 2019 22:30:44 +0000 (15:30 -0700)
committerYan Yang <yangyanyj@chinamobile.com>
Fri, 21 Jun 2019 05:12:48 +0000 (05:12 +0000)
When the program is killed by unix 'kill' or crtl+c,
program should exits gracefully after all data is saved to DBs.

Issue-ID: DCAEGEN2-1633

Change-Id: Ic134440be507faa44d04434eeaea1035ce7d63f0
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
14 files changed:
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DmaapServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java

index fa9f7d9..3e67f38 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.onap.datalake.feeder.config;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -41,6 +43,8 @@ import lombok.Setter;
 @EnableAutoConfiguration
 public class ApplicationConfiguration {
 
+       final ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
+       
        //App general
        private boolean async;
        private boolean enableSSL;
index 6a44c4f..0a64ddb 100644 (file)
@@ -58,9 +58,12 @@ public class FeederController {
     @ResponseBody
        @ApiOperation(value="Start pulling data.")
     public String start() throws IOException {
-       log.info("DataLake feeder starting to pull data from DMaaP...");
+       log.info("Going to start DataLake feeder ...");
        if(pullService.isRunning() == false) {
             pullService.start();
+               log.info("DataLake feeder started.");
+        }else {
+               log.info("DataLake feeder already started.");           
         }
         return "{\"running\": true}";
     }
@@ -72,11 +75,14 @@ public class FeederController {
     @ResponseBody
        @ApiOperation(value="Stop pulling data.")
     public String stop() {
+       log.info("Going to stop DataLake feeder ...");
         if(pullService.isRunning() == true)
         {
             pullService.shutdown();
+               log.info("DataLake feeder is stopped.");
+        }else {
+               log.info("DataLake feeder already stopped.");
         }
-       log.info("DataLake feeder is stopped.");
        return "{\"running\": false}";
     }
     /**
index d7d5f87..fc31b2e 100644 (file)
@@ -93,7 +93,14 @@ public class CouchbaseService {
 
        @PreDestroy
        public void cleanUp() {
-               bucket.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       log.info("bucket.close() at cleanUp.");
+                       bucket.close();
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
+               }
        }
 
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
index 3be5be6..5c544d6 100644 (file)
@@ -62,8 +62,15 @@ public class DmaapService {
 
        @PreDestroy
        public void cleanUp() throws InterruptedException {
-               if (zk != null) {
-                       zk.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       if (zk != null) {
+                               log.info("cleanUp() called, close zk.");
+                               zk.close();
+                       }
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
                }
        }
 
index 2806e48..b40f544 100644 (file)
@@ -73,7 +73,7 @@ public class ElasticsearchService {
 
        private RestHighLevelClient client;
        ActionListener<BulkResponse> listener;
-
+       
        //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
        //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
        @PostConstruct
@@ -89,7 +89,9 @@ public class ElasticsearchService {
                listener = new ActionListener<BulkResponse>() {
                        @Override
                        public void onResponse(BulkResponse bulkResponse) {
-
+                               if(bulkResponse.hasFailures()) {
+                                       log.debug(bulkResponse.buildFailureMessage());
+                               }
                        }
 
                        @Override
@@ -101,7 +103,16 @@ public class ElasticsearchService {
 
        @PreDestroy
        public void cleanUp() throws IOException {
-               client.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       log.info("cleanUp() closing Elasticsearch client.");
+                       client.close();
+               } catch (IOException e) {
+                       log.error("client.close() at cleanUp.", e);
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
+               }
        }
 
        public void ensureTableExist(String topic) throws IOException {
@@ -120,6 +131,7 @@ public class ElasticsearchService {
 
        //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+               
                BulkRequest request = new BulkRequest();
 
                for (JSONObject json : jsons) {
@@ -128,11 +140,11 @@ public class ElasticsearchService {
                                if (found) {
                                        continue;
                                }
-                       }
-
+                       }                       
+                       
                        String id = topic.getMessageId(json); //id can be null
-
-                       request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
+                       
+                       request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
                }
 
                log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
@@ -141,13 +153,17 @@ public class ElasticsearchService {
                        client.bulkAsync(request, RequestOptions.DEFAULT, listener);
                } else {
                        try {
-                               client.bulk(request, RequestOptions.DEFAULT);
+                               BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
+                               if(bulkResponse.hasFailures()) {
+                                       log.debug(bulkResponse.buildFailureMessage());
+                               }
                        } catch (IOException e) {
                                log.error(topic.getName(), e);
                        }
                }
+               
        }
-
+       
        /**
         *
         * @param topic
index 135a2c0..d92d05a 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.dto.TopicConfig;
@@ -93,7 +94,7 @@ public class HdfsService {
                                        lastFlush = System.currentTimeMillis();
                                }
                        } catch (IOException e) {
-                               log.error("error saving to HDFS." + topic, e);
+                               log.error("{} error saving to HDFS. {}", topic, e.getMessage());
                        }
                }
 
@@ -134,11 +135,12 @@ public class HdfsService {
                                        out.writeUTF(message);
                                        out.write('\n');
                                } catch (IOException e) {
-                                       log.error("error writing to HDFS.", e);
+                                       log.error("error writing to HDFS. {}", e.getMessage());
                                }
                        });
 
                        out.close();
+                       log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
                }
        }
 
@@ -161,6 +163,10 @@ public class HdfsService {
 
                        fileSystem = FileSystem.get(hdfsConfig);
 
+                       //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
+                       ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
+                       hadoopShutdownHookManager.clearShutdownHooks();
+
                        isReady = true;
                } catch (Exception ex) {
                        log.error("error connection to HDFS.", ex);
@@ -170,20 +176,27 @@ public class HdfsService {
 
        @PreDestroy
        public void cleanUp() {
+               config.getShutdownLock().readLock().lock();
+
                try {
+                       log.info("fileSystem.close() at cleanUp.");
                        flush();
                        fileSystem.close();
                } catch (IOException e) {
                        log.error("fileSystem.close() at cleanUp.", e);
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
                }
        }
 
        public void flush() {
+               log.info("Force flush ALL data, regardless of stall");
                bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
        }
 
        //if no new data comes in for a topic for a while, need to flush its buffer
        public void flushStall() {
+               log.debug("Flush stall data");
                bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
        }
 
@@ -198,7 +211,7 @@ public class HdfsService {
                if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
                        buffer.flush(topicStr);
                } else {
-                       log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize());
+                       log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
                }
        }
 
index 32d21c6..f3462e4 100644 (file)
@@ -131,7 +131,14 @@ public class MongodbService {
 
        @PreDestroy
        public void cleanUp() {
-               mongoClient.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       log.info("mongoClient.close() at cleanUp.");
+                       mongoClient.close();
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
+               }
        }
 
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
index 7ed8879..84d5f33 100644 (file)
@@ -52,7 +52,7 @@ public class PullService {
 
        @Autowired
        private TopicConfigPollingService topicConfigPollingService;
-       
+
        @Autowired
        private ApplicationConfiguration config;
 
@@ -80,7 +80,7 @@ public class PullService {
                for (int i = 0; i < numConsumers; i++) {
                        executorService.submit(puller);
                }
-               
+
                topicConfigPollingThread = new Thread(topicConfigPollingService);
                topicConfigPollingThread.setName("TopicConfigPolling");
                topicConfigPollingThread.start();
@@ -98,22 +98,27 @@ public class PullService {
                        return;
                }
 
-               logger.info("stop pulling ...");
-               puller.shutdown();
+               config.getShutdownLock().writeLock().lock();
+               try {
+                       logger.info("stop pulling ...");
+                       puller.shutdown();
 
-               logger.info("stop TopicConfigPollingService ...");
-               topicConfigPollingService.shutdown();
+                       logger.info("stop TopicConfigPollingService ...");
+                       topicConfigPollingService.shutdown();
 
-               try {
                        topicConfigPollingThread.join();
-                       
+
                        executorService.shutdown();
                        executorService.awaitTermination(120L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
-                       logger.error("executor.awaitTermination", e);
+                       logger.error("shutdown(): executor.awaitTermination", e);
                        Thread.currentThread().interrupt();
+               } catch (Exception e) {
+                       logger.error("shutdown error.", e);
+               } finally {
+                       config.getShutdownLock().writeLock().unlock();
                }
-
+               
                isRunning = false;
        }
 
index 58b2783..21e1a08 100644 (file)
@@ -37,7 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * Service to check topic changes in Kafka and topic setting updates
+ * Service to check topic changes in Kafka and topic setting updates in DB
  * 
  * @author Guobiao Mo
  *
@@ -74,7 +74,7 @@ public class TopicConfigPollingService implements Runnable {
                }
        }
 
-       public boolean isActiveTopicsChanged(boolean update) {
+       public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version 
                boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
                log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
                if (changed && update) {
@@ -96,10 +96,13 @@ public class TopicConfigPollingService implements Runnable {
        public void run() {
                active = true;
                log.info("TopicConfigPollingService started.");
-               
+
                while (active) {
                        try { //sleep first since we already pool in init()
                                Thread.sleep(config.getDmaapCheckNewTopicInterval());
+                               if(!active) {
+                                       break;
+                               }
                        } catch (InterruptedException e) {
                                log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
                                Thread.currentThread().interrupt();
@@ -131,7 +134,11 @@ public class TopicConfigPollingService implements Runnable {
        private List<String> poll() throws IOException {
                log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
                List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
-               activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+               Map<String, TopicConfig> tempEffectiveTopicConfigMap = new HashMap<>();
+
+               activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+               effectiveTopicConfigMap = tempEffectiveTopicConfigMap;
+               log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap);
 
                List<String> ret = new ArrayList<>(activeTopicConfigs.size());
                activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
index 0efde44..9765329 100755 (executable)
@@ -37,8 +37,11 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
 
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class CouchbaseServiceTest {
@@ -141,6 +144,8 @@ public class CouchbaseServiceTest {
     public void testCleanupBucket() {
         CouchbaseService couchbaseService = new CouchbaseService();
         couchbaseService.bucket = bucket;
+       ApplicationConfiguration appConfig = new ApplicationConfiguration();
+        couchbaseService.config = appConfig;
         couchbaseService.cleanUp();
     }
 
index 81c3718..e0a1ce5 100644 (file)
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -58,6 +59,8 @@ public class DmaapServiceTest {
 //             when(config.getDmaapKafkaExclude()).thenReturn(new String[] { "AAI-EVENT" });
         when(config.getDmaapZookeeperHostPort()).thenReturn(DMAPP_ZOOKEEPER_HOST_PORT);
         assertNotEquals(list, dmaapService.getTopics());
+
+               when(config.getShutdownLock()).thenReturn(new ReentrantReadWriteLock());
        dmaapService.cleanUp();
     }
 
index 016381b..ef28f1f 100644 (file)
@@ -33,10 +33,13 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
 
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MongodbServiceTest {
@@ -62,7 +65,7 @@ public class MongodbServiceTest {
 
     @Test
     public void cleanUp() {
-
+               when(config.getShutdownLock()).thenReturn(new ReentrantReadWriteLock());
         mongodbService.cleanUp();
     }
 
index 774cd22..757cdd7 100644 (file)
@@ -83,7 +83,7 @@ public class TopicServiceTest {
        }
 
 
-       @Test(expected = IOException.class)
+       @Test
        public void testGetEffectiveTopic() throws IOException {
                String name = "a";
                Topic topic = new Topic(name);
@@ -96,7 +96,6 @@ public class TopicServiceTest {
                when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(topic));
                when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
                when(topicRepository.findById(null)).thenReturn(Optional.empty());
-               doThrow(IOException.class).when(elasticsearchService).ensureTableExist(name);
 
                assertEquals(topicService.getEffectiveTopic(name), topicService.getEffectiveTopic(name, false));