Enable setting sasl.mechanism
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / kafka / SdcKafkaProducer.java
index bdc984d..7158357 100644 (file)
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SdcKafkaProducer {
     private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+    private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
 
     private KafkaProducer<String, String> kafkaProducer;
 
@@ -53,7 +54,7 @@ public class SdcKafkaProducer {
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
-        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
         kafkaProducer = new KafkaProducer<>(properties);
     }
 
@@ -77,7 +78,16 @@ public class SdcKafkaProducer {
             throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
         }
     }
-
+    
+    private static String getKafkaSaslMechanism() throws KafkaException {
+        String saslMechanism = System.getenv("SASL_MECHANISM");
+        if(saslMechanism != null) {
+            return saslMechanism;
+        } else {
+            return DEFAULT_SASL_MECHANISM;
+        }
+    }
+    
     /**
      * @param message A message to Send
      * @param topicName The name of the topic to publish to