private String defaultTopicName;
 
-       private int checkTopicInterval; //in millisecond
-/*
-       //DMaaP
-       private String dmaapZookeeperHostPort;
-       private String dmaapKafkaHostPort;
-       private String dmaapKafkaGroup;
-       private String dmaapKafkaLogin;
-       private String dmaapKafkaPass;
-       private String dmaapKafkaSecurityProtocol;
-       
-       private long dmaapKafkaTimeout;
-       private String[] dmaapKafkaExclude;
+       private long checkTopicInterval; //in millisecond
 
-       private int dmaapCheckNewTopicInterval; //in millisecond
-
-       private int kafkaConsumerCount;
-*/
        private String elasticsearchType;
 
        //HDFS
 
        )
        private Set<Topic> topics;
 
+       public boolean isTool() {
+               return dbType.isTool();
+       }
+       
        public boolean isHdfs() {
                return isDb(DbTypeEnum.HDFS);
        }
        
        @Override
        public String toString() {
-               return String.format("Db %s (name=%, enabled=%s)", id, name, enabled);
+               return String.format("Db %s (name=%s, enabled=%s)", id, name, enabled);
        }
 
        @Override
 
 
        @Override
        public String toString() {
-               return String.format("Kafka %s (name=%, enabled=%s)", id, name, enabled);
+               return String.format("Kafka %s (name=%s, enabled=%s)", id, name, enabled);
        }
 
        @Override
 
 
 package org.onap.datalake.feeder.service;
 
-import org.onap.datalake.feeder.repository.DbRepository;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 /**
  */
 @Service
 public class DbService {
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
 
        @Autowired
-       private DbRepository dbRepository;
+       private ApplicationContext context;
+
+       private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
+
+       public DbStoreService findDbStoreService(Db db) {
+               DbStoreService ret = dbStoreServiceMap.get(db.getId());
+               if (ret != null) {
+                       return ret;
+               }
+
+               DbType dbType = db.getDbType();
+               DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+               switch (dbTypeEnum) {
+               case CB:
+                       ret = context.getBean(CouchbaseService.class, db);
+                       break;
+               case ES:
+                       ret = context.getBean(ElasticsearchService.class, db);
+                       break;
+               case HDFS:
+                       ret = context.getBean(HdfsService.class, db);
+                       break;
+               case MONGO:
+                       ret = context.getBean(MongodbService.class, db);
+                       break;
+               default:
+                       log.error("Should not have come here {}", db);
+                       ret = null;
+               }
+
+               dbStoreServiceMap.put(db.getId(), ret);
+
+               return ret;
+       }
+
 }
 
 package org.onap.datalake.feeder.service;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.EffectiveTopic;
 import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
  *
  */
 @Service
+@Scope("prototype")
 public class DmaapService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
                        log.debug("get topic setting from DB: {}.", topicStr);
 
                        List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true);
-                       
-                       ret.put(topicStr , effectiveTopics);
+                       if(CollectionUtils.isNotEmpty(effectiveTopics )) {
+                               log.debug("add effectiveTopics  {}:{}.", topicStr, effectiveTopics);
+                               ret.put(topicStr , effectiveTopics);
+                       }
                        
                }
                return ret;
 
 
        private boolean isRunning = false;
        private ExecutorService executorService;
-//     private Thread topicConfigPollingThread;
        private Set<Puller> pullers;
 
        @Autowired
                }
 
                executorService.submit(topicConfigPollingService);
-               /*topicConfigPollingThread = new Thread(topicConfigPollingService);
-               topicConfigPollingThread.setName("TopicConfigPolling");
-               topicConfigPollingThread.start();
-*/
+               
                isRunning = true;
 
                Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
                                puller.shutdown();
                        }
 
-//                     logger.info("stop TopicConfigPollingService ...");
-//                     topicConfigPollingService.shutdown();
-
-       //              topicConfigPollingThread.join();
-
                        logger.info("stop executorService ...");
                        executorService.shutdown();
                        executorService.awaitTermination(120L, TimeUnit.SECONDS);
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
  */
 
 @Service
+@Scope("prototype")
 public class Puller implements Runnable {
 
        @Autowired
        private boolean async;
        
        private Kafka kafka;
-
-       public Puller( ) {
-               
-       }
+       
        public Puller(Kafka kafka) {
                this.kafka = kafka;
        }
 
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.DbType;
 import org.onap.datalake.feeder.domain.EffectiveTopic;
 import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
 import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
 import org.onap.datalake.feeder.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
        private ApplicationConfiguration config;
 
        @Autowired
-       private ApplicationContext context;
+       private DbService dbService;
 
        @Autowired
        private TopicConfigPollingService configPollingService;
                }
 
                Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
-               for(EffectiveTopic effectiveTopic:effectiveTopics) {
+               for (EffectiveTopic effectiveTopic : effectiveTopics) {
                        saveMessagesForTopic(effectiveTopic, messages);
                }
        }
-       
+
        private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
                if (!effectiveTopic.getTopic().isEnabled()) {
                        log.error("we should not come here {}", effectiveTopic);
                Set<Db> dbs = effectiveTopic.getTopic().getDbs();
 
                for (Db db : dbs) {
-                       if (db.getDbType().isTool() || !db.isEnabled()) {
+                       if (db.isTool() || db.isDruid() || !db.isEnabled()) {
                                continue;
                        }
-                       DbStoreService dbStoreService = findDbStoreService(db);
-                       dbStoreService.saveJsons(effectiveTopic, docs);
+                       DbStoreService dbStoreService = dbService.findDbStoreService(db);
+                       if (dbStoreService != null) {
+                               dbStoreService.saveJsons(effectiveTopic, docs);
+                       }
                }
        }
 
                long timestamp = pair.getLeft();
                String text = pair.getRight();
 
-               //for debug, to be remove
-               //              String topicStr = topic.getId();
-               //              if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
-               //              log.debug("{} ={}", topicStr, text);
-               //}
-
                boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
 
                JSONObject json = null;
                return json;
        }
 
-       private DbStoreService findDbStoreService(Db db) {
-               DbType dbType = db.getDbType();
-               DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
-               switch (dbTypeEnum) {
-               case CB:
-                       return context.getBean(CouchbaseService.class, db);
-               case ES:
-                       return context.getBean(ElasticsearchService.class, db);
-               case HDFS:
-                       return context.getBean(HdfsService.class, db);
-               case MONGO:
-                       return context.getBean(MongodbService.class, db);
-               default:
-                       log.error("we should not come here {}", dbTypeEnum);
-                       return null;
-               }
-       }
-
        public void flush() { //force flush all buffer 
-//             hdfsService.flush();
+               //              hdfsService.flush();
        }
 
        public void flushStall() { //flush stall buffer
-       //      hdfsService.flushStall();
+               //      hdfsService.flushStall();
        }
 }
 
        private KafkaRepository kafkaRepository;
        
        //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
-       private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();;
+       private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
        //private Map<String, TopicConfig> effectiveTopicConfigMap;
 
        //monitor Kafka topic list changes
        private Map<String, Set<String>> activeTopicMap;
        
-       private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = new ThreadLocal<>();
-       private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();
+       private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal =   ThreadLocal.withInitial(HashMap::new);//topic name:version
+       private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
+       private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
 
        private boolean active = false;
 
        private Set<String> poll(Kafka kafka) throws IOException {
                log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
 
-               DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
+               DmaapService dmaapService =  dmaapServiceMap.get(kafka.getId());
+               if(dmaapService==null) {
+                       dmaapService = context.getBean(DmaapService.class, kafka);
+                       dmaapServiceMap.put(kafka.getId(), dmaapService);
+               }
                                
                Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
                effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 
 /**
        @Autowired
        private ApplicationConfiguration config;
 
-       @Autowired
-       private ApplicationContext context;
-
        @Autowired
        private TopicNameRepository topicNameRepository;
 
        @Autowired
        private DbRepository dbRepository;
 
+       @Autowired
+       private DbService dbService;
+       
        public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
 
                List<Topic> topics = findTopics(kafka, topicStr);
                        if (ensureTableExist) {
                                for (Db db : topic.getDbs()) {
                                        if (db.isElasticsearch()) {
-                                               ElasticsearchService elasticsearchService = context.getBean(ElasticsearchService.class, db);
+                                               ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db);                                            
                                                elasticsearchService.ensureTableExist(topicStr);
                                        }
                                }
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import com.couchbase.client.java.Bucket;
  *
  */
 @Service
+@Scope("prototype")
 public class CouchbaseService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
        ApplicationConfiguration config;
        
        private Db couchbase;
-/*
-       @Autowired
-       private DbService dbService;
-
-       private boolean isReady = false;
-*/
        Bucket bucket;
-
-       public CouchbaseService( ) {
-               
-       }
+       
        public CouchbaseService(Db db) {
                couchbase = db;
        }
 
 import org.slf4j.LoggerFactory;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
  *
  */
 @Service
+@Scope("prototype")
 public class ElasticsearchService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
        @Autowired
        private ApplicationConfiguration config;
 
-       //@Autowired
-//     private DbService dbService;
-
        private RestHighLevelClient client;
        ActionListener<BulkResponse> listener;
-
-       public ElasticsearchService( ) {
-               
-       }
+       
        public ElasticsearchService(Db db) {
                elasticsearch = db;
        }
        //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
        @PostConstruct
        private void init() {
-               //Db elasticsearch = dbService.getElasticsearch();
                String elasticsearchHost = elasticsearch.getHost();
 
                // Initialize the Connection
 
 import org.slf4j.LoggerFactory;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import lombok.Getter;
  *
  */
 @Service
+@Scope("prototype")
 public class HdfsService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
                }
        }
 
-       public HdfsService( ) { 
-       }
-
        public HdfsService(Db db) {
                hdfs = db;
        }
 
 import org.slf4j.LoggerFactory;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import com.mongodb.bulk.BulkWriteError;
  *
  */
 @Service
+@Scope("prototype")
 public class MongodbService implements DbStoreService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
        private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
        private InsertManyOptions insertManyOptions;
 
-       public MongodbService( ) { 
-       }
        public MongodbService(Db db) {
                mongodb = db;
        }