package org.onap.datalake.feeder.config;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 @EnableAutoConfiguration
 public class ApplicationConfiguration {
 
+       final ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
+       
        //App general
        private boolean async;
        private boolean enableSSL;
 
     @ResponseBody
        @ApiOperation(value="Start pulling data.")
     public String start() throws IOException {
-       log.info("DataLake feeder starting to pull data from DMaaP...");
+       log.info("Going to start DataLake feeder ...");
        if(pullService.isRunning() == false) {
             pullService.start();
+               log.info("DataLake feeder started.");
+        }else {
+               log.info("DataLake feeder already started.");           
         }
         return "{\"running\": true}";
     }
     @ResponseBody
        @ApiOperation(value="Stop pulling data.")
     public String stop() {
+       log.info("Going to stop DataLake feeder ...");
         if(pullService.isRunning() == true)
         {
             pullService.shutdown();
+               log.info("DataLake feeder is stopped.");
+        }else {
+               log.info("DataLake feeder already stopped.");
         }
-       log.info("DataLake feeder is stopped.");
        return "{\"running\": false}";
     }
     /**
 
 
        @PreDestroy
        public void cleanUp() {
-               bucket.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       log.info("bucket.close() at cleanUp.");
+                       bucket.close();
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
+               }
        }
 
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
 
 
        @PreDestroy
        public void cleanUp() throws InterruptedException {
-               if (zk != null) {
-                       zk.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       if (zk != null) {
+                               log.info("cleanUp() called, close zk.");
+                               zk.close();
+                       }
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
                }
        }
 
 
 
        private RestHighLevelClient client;
        ActionListener<BulkResponse> listener;
-
+       
        //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
        //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
        @PostConstruct
                listener = new ActionListener<BulkResponse>() {
                        @Override
                        public void onResponse(BulkResponse bulkResponse) {
-
+                               if(bulkResponse.hasFailures()) {
+                                       log.debug(bulkResponse.buildFailureMessage());
+                               }
                        }
 
                        @Override
 
        @PreDestroy
        public void cleanUp() throws IOException {
-               client.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       log.info("cleanUp() closing Elasticsearch client.");
+                       client.close();
+               } catch (IOException e) {
+                       log.error("client.close() at cleanUp.", e);
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
+               }
        }
 
        public void ensureTableExist(String topic) throws IOException {
 
        //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+               
                BulkRequest request = new BulkRequest();
 
                for (JSONObject json : jsons) {
                                if (found) {
                                        continue;
                                }
-                       }
-
+                       }                       
+                       
                        String id = topic.getMessageId(json); //id can be null
-
-                       request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
+                       
+                       request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
                }
 
                log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
                        client.bulkAsync(request, RequestOptions.DEFAULT, listener);
                } else {
                        try {
-                               client.bulk(request, RequestOptions.DEFAULT);
+                               BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
+                               if(bulkResponse.hasFailures()) {
+                                       log.debug(bulkResponse.buildFailureMessage());
+                               }
                        } catch (IOException e) {
                                log.error(topic.getName(), e);
                        }
                }
+               
        }
-
+       
        /**
         *
         * @param topic
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.dto.TopicConfig;
                                        lastFlush = System.currentTimeMillis();
                                }
                        } catch (IOException e) {
-                               log.error("error saving to HDFS." + topic, e);
+                               log.error("{} error saving to HDFS. {}", topic, e.getMessage());
                        }
                }
 
                                        out.writeUTF(message);
                                        out.write('\n');
                                } catch (IOException e) {
-                                       log.error("error writing to HDFS.", e);
+                                       log.error("error writing to HDFS. {}", e.getMessage());
                                }
                        });
 
                        out.close();
+                       log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
                }
        }
 
 
                        fileSystem = FileSystem.get(hdfsConfig);
 
+                       //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
+                       ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
+                       hadoopShutdownHookManager.clearShutdownHooks();
+
                        isReady = true;
                } catch (Exception ex) {
                        log.error("error connection to HDFS.", ex);
 
        @PreDestroy
        public void cleanUp() {
+               config.getShutdownLock().readLock().lock();
+
                try {
+                       log.info("fileSystem.close() at cleanUp.");
                        flush();
                        fileSystem.close();
                } catch (IOException e) {
                        log.error("fileSystem.close() at cleanUp.", e);
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
                }
        }
 
        public void flush() {
+               log.info("Force flush ALL data, regardless of stall");
                bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
        }
 
        //if no new data comes in for a topic for a while, need to flush its buffer
        public void flushStall() {
+               log.debug("Flush stall data");
                bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
        }
 
                if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
                        buffer.flush(topicStr);
                } else {
-                       log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize());
+                       log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
                }
        }
 
 
 
        @PreDestroy
        public void cleanUp() {
-               mongoClient.close();
+               config.getShutdownLock().readLock().lock();
+
+               try {
+                       log.info("mongoClient.close() at cleanUp.");
+                       mongoClient.close();
+               } finally {
+                       config.getShutdownLock().readLock().unlock();
+               }
        }
 
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
 
 
        @Autowired
        private TopicConfigPollingService topicConfigPollingService;
-       
+
        @Autowired
        private ApplicationConfiguration config;
 
                for (int i = 0; i < numConsumers; i++) {
                        executorService.submit(puller);
                }
-               
+
                topicConfigPollingThread = new Thread(topicConfigPollingService);
                topicConfigPollingThread.setName("TopicConfigPolling");
                topicConfigPollingThread.start();
                        return;
                }
 
-               logger.info("stop pulling ...");
-               puller.shutdown();
+               config.getShutdownLock().writeLock().lock();
+               try {
+                       logger.info("stop pulling ...");
+                       puller.shutdown();
 
-               logger.info("stop TopicConfigPollingService ...");
-               topicConfigPollingService.shutdown();
+                       logger.info("stop TopicConfigPollingService ...");
+                       topicConfigPollingService.shutdown();
 
-               try {
                        topicConfigPollingThread.join();
-                       
+
                        executorService.shutdown();
                        executorService.awaitTermination(120L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
-                       logger.error("executor.awaitTermination", e);
+                       logger.error("shutdown(): executor.awaitTermination", e);
                        Thread.currentThread().interrupt();
+               } catch (Exception e) {
+                       logger.error("shutdown error.", e);
+               } finally {
+                       config.getShutdownLock().writeLock().unlock();
                }
-
+               
                isRunning = false;
        }
 
 
 import org.springframework.stereotype.Service;
 
 /**
- * Service to check topic changes in Kafka and topic setting updates
+ * Service to check topic changes in Kafka and topic setting updates in DB
  * 
  * @author Guobiao Mo
  *
                }
        }
 
-       public boolean isActiveTopicsChanged(boolean update) {
+       public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version 
                boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
                log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
                if (changed && update) {
        public void run() {
                active = true;
                log.info("TopicConfigPollingService started.");
-               
+
                while (active) {
                        try { //sleep first since we already pool in init()
                                Thread.sleep(config.getDmaapCheckNewTopicInterval());
+                               if(!active) {
+                                       break;
+                               }
                        } catch (InterruptedException e) {
                                log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
                                Thread.currentThread().interrupt();
        private List<String> poll() throws IOException {
                log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
                List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
-               activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+               Map<String, TopicConfig> tempEffectiveTopicConfigMap = new HashMap<>();
+
+               activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+               effectiveTopicConfigMap = tempEffectiveTopicConfigMap;
+               log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap);
 
                List<String> ret = new ArrayList<>(activeTopicConfigs.size());
                activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
 
 
 import java.util.HashMap;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.json.JSONArray;
 import org.json.JSONObject;
 
 
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
 
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class CouchbaseServiceTest {
     public void testCleanupBucket() {
         CouchbaseService couchbaseService = new CouchbaseService();
         couchbaseService.bucket = bucket;
+       ApplicationConfiguration appConfig = new ApplicationConfiguration();
+        couchbaseService.config = appConfig;
         couchbaseService.cleanUp();
     }
 
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 //             when(config.getDmaapKafkaExclude()).thenReturn(new String[] { "AAI-EVENT" });
         when(config.getDmaapZookeeperHostPort()).thenReturn(DMAPP_ZOOKEEPER_HOST_PORT);
         assertNotEquals(list, dmaapService.getTopics());
+
+               when(config.getShutdownLock()).thenReturn(new ReentrantReadWriteLock());
        dmaapService.cleanUp();
     }
 
 
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
 
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MongodbServiceTest {
 
     @Test
     public void cleanUp() {
-
+               when(config.getShutdownLock()).thenReturn(new ReentrantReadWriteLock());
         mongodbService.cleanUp();
     }
 
 
        }
 
 
-       @Test(expected = IOException.class)
+       @Test
        public void testGetEffectiveTopic() throws IOException {
                String name = "a";
                Topic topic = new Topic(name);
                when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(topic));
                when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
                when(topicRepository.findById(null)).thenReturn(Optional.empty());
-               doThrow(IOException.class).when(elasticsearchService).ensureTableExist(name);
 
                assertEquals(topicService.getEffectiveTopic(name), topicService.getEffectiveTopic(name, false));