private String dmaapZookeeperHostPort;
        private String dmaapKafkaHostPort;
        private String dmaapKafkaGroup;
+       private String dmaapKafkaLogin;
+       private String dmaapKafkaPass;
+       private String dmaapKafkaSecurityProtocol;
+       
        private long dmaapKafkaTimeout;
        private String[] dmaapKafkaExclude;
 
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.PostConstruct;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.config.ConfigurableBeanFactory;
-import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
                consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
                consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
-               //              consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
-               //      consumerConfig.put("sasl.mechanism", "PLAIN");
-
+               if (StringUtils.isNotBlank(config.getDmaapKafkaLogin())) {
+                       String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + config.getDmaapKafkaLogin() + " password=" + config.getDmaapKafkaPass() + " serviceName=kafka;";
+                       consumerConfig.put("sasl.jaas.config", jaas);
+                       consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getDmaapKafkaSecurityProtocol());
+                       consumerConfig.put("sasl.mechanism", "PLAIN");
+               }
                return consumerConfig;
        }
 
 
 
 
 #####################DMaaP
-#dmaapZookeeperHostPort=127.0.0.1:2181
-#dmaapKafkaHostPort=127.0.0.1:9092
 dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
 dmaapKafkaGroup=dlgroup44
+#dmaapKafkaLogin=admin
+#dmaapKafkaPass=admin-secret
+#dmaapKafkaSecurityProtocol=SASL_PLAINTEXT
+
 #in second
-dmaapKafkaTimeout=60
+dmaapKafkaTimeout=10
 dmaapKafkaExclude[0]=__consumer_offsets
 dmaapKafkaExclude[1]=__transaction_state
 #dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
 #check for new topics , in millisecond
-dmaapCheckNewTopicInterval=60000
+dmaapCheckNewTopicInterval=10000
 
 kafkaConsumerCount=3