[MR] Add support for configuring jaas.sasl.config at runtime
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaConsumerFactory.java
index 26a8cf4..6f5a17c 100644 (file)
@@ -8,16 +8,16 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *  
+ *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.beans;
 
@@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.backends.Consumer;
@@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit;
  */
 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
-       
+
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-       
+
 
        /**
         * constructor initialization
-        * 
+        *
         * @param settings
         * @param metrics
         * @param curator
@@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
         */
 
        public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
-                       @Qualifier("curator") CuratorFramework curator,
-                       @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+                                                                        @Qualifier("curator") CuratorFramework curator,
+                                                                        @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
                        throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
 
                String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
                final boolean isCacheEnabled = kSetting_EnableCache;
 
-               
+
                fCache = null;
                if (isCacheEnabled) {
                        fCache = KafkaConsumerCache.getInstance();
@@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                        fCache.setfApiId(apiNodeId);
                        fCache.startCache(mode, curator);
                        if(kafkaLiveLockAvoider!=null){
-                       kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
-                       fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+                               kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+                               fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
                        }
                }
        }
 
        /*
         * getConsumerFor
-        * 
+        *
         * @see
         * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
         * java.lang.String, java.lang.String, int, java.lang.String) This method is
@@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
         */
        @Override
        public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
-                       String remotehost) throws UnavailableException, CambriaApiException {
+                                                                  String remotehost) throws UnavailableException, CambriaApiException {
                Kafka011Consumer kc;
 
                // To synchronize based on the consumer group.
@@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
                                        log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
                                                        + "], on topic [" + topic + "].");
-                                       
+
                                        if (fCache != null) {
                                                fCache.signalOwnership(topic, consumerGroupName, consumerId);
                                        }
-                                       
+
                                        final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
                                        long fCreateTimeMs = System.currentTimeMillis();
                                        KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
@@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                                                        + consumerId);
                                        log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
                                                        + consumerId);
-                                       
+
                                } finally {
                                        if (locked) {
                                                try {
@@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                fCache.dropAllConsumers();
        }
 
-       
+
        private KafkaConsumerCache fCache;
        private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
        private String fkafkaBrokers;
@@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        private void transferSettingIfProvided(Properties target, String key, String prefix) {
                String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
 
-               
+
                if (null != keyVal) {
-               
+
                        log.info("Setting [" + key + "] to " + keyVal + ".");
                        target.put(key, keyVal);
                }
        }
 
        /**
-        * Name CreateConsumerconfig  
+        * Name CreateConsumerconfig
         * @param topic
         * @param groupId
         * @param consumerId
         * @return Properties
-        * 
+        *
         * This method is to create Properties required to create kafka connection
-        * Group name is replaced with different format groupid--topic to address same 
-        * groupids for multiple topics. Same groupid with multiple topics 
+        * Group name is replaced with different format groupid--topic to address same
+        * groupids for multiple topics. Same groupid with multiple topics
         * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
         */
        private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
                final Properties props = new Properties();
                //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
-               //Fix for CPFMF-644 : 
+               //Fix for CPFMF-644 :
                final String fakeGroupName = groupId + "--" + topic;
                props.put("group.id", fakeGroupName);
                props.put("enable.auto.commit", "false"); // 0.11
                props.put("bootstrap.servers", fkafkaBrokers);
                if(Utils.isCadiEnabled()){
-               props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-               props.put("security.protocol", "SASL_PLAINTEXT");
-               props.put("sasl.mechanism", "PLAIN");
+                       props.putAll(Utils.addSaslProps());
                }
                props.put("client.id", consumerId);
 
@@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
        /**
         * putting values in hashmap like consumer timeout, zookeeper time out, etc
-        * 
+        *
         * @param setting
         */
        private static void populateKafkaInternalDefaultsMap() { }
@@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        /*
         * The starterIncremnt value is just to emulate calling certain consumers,
         * in this test app all the consumers are local
-        * 
+        *
         */
        private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
 
@@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        @SuppressWarnings("rawtypes")
        @Override
        public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
-                       String remotehost) throws UnavailableException, CambriaApiException {
+                                                                                 String remotehost) throws UnavailableException, CambriaApiException {
                // TODO Auto-generated method stub
                return null;
        }