Enable setting sasl.mechanism
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / kafka / SdcKafkaConsumer.java
index 8879bf0..04df4e1 100644 (file)
@@ -42,6 +42,8 @@ import org.openecomp.sdc.common.log.wrappers.Logger;
 public class SdcKafkaConsumer {
 
     private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+    private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
+
     private final DistributionEngineConfiguration deConfiguration;
     private KafkaConsumer<String, String> kafkaConsumer;
 
@@ -61,7 +63,7 @@ public class SdcKafkaConsumer {
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
 
         properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
         kafkaConsumer = new KafkaConsumer<>(properties);
@@ -90,6 +92,15 @@ public class SdcKafkaConsumer {
             throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
         }
     }
+    
+    private static String getKafkaSaslMechanism() throws KafkaException {
+        String saslMechanism = System.getenv("SASL_MECHANISM");
+        if(saslMechanism != null) {
+            return saslMechanism;
+        } else {
+            return DEFAULT_SASL_MECHANISM;
+        }
+    }
 
     /**
      *