Enable setting sasl.mechanism 33/134933/2
authorMichaelMorris <michael.morris@est.tech>
Fri, 16 Jun 2023 13:38:28 +0000 (14:38 +0100)
committerVasyl Razinkov <vasyl.razinkov@est.tech>
Fri, 16 Jun 2023 15:16:09 +0000 (15:16 +0000)
Signed-off-by: MichaelMorris <michael.morris@est.tech>
Issue-ID: SDC-4540
Change-Id: Iaa88b41ca6a49427695b891544d6f4767196c75f

catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java

index 8879bf0..04df4e1 100644 (file)
@@ -42,6 +42,8 @@ import org.openecomp.sdc.common.log.wrappers.Logger;
 public class SdcKafkaConsumer {
 
     private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+    private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
+
     private final DistributionEngineConfiguration deConfiguration;
     private KafkaConsumer<String, String> kafkaConsumer;
 
@@ -61,7 +63,7 @@ public class SdcKafkaConsumer {
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
 
         properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
         kafkaConsumer = new KafkaConsumer<>(properties);
@@ -90,6 +92,15 @@ public class SdcKafkaConsumer {
             throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
         }
     }
+    
+    private static String getKafkaSaslMechanism() throws KafkaException {
+        String saslMechanism = System.getenv("SASL_MECHANISM");
+        if(saslMechanism != null) {
+            return saslMechanism;
+        } else {
+            return DEFAULT_SASL_MECHANISM;
+        }
+    }
 
     /**
      *
index bdc984d..7158357 100644 (file)
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SdcKafkaProducer {
     private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+    private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
 
     private KafkaProducer<String, String> kafkaProducer;
 
@@ -53,7 +54,7 @@ public class SdcKafkaProducer {
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
-        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
         kafkaProducer = new KafkaProducer<>(properties);
     }
 
@@ -77,7 +78,16 @@ public class SdcKafkaProducer {
             throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
         }
     }
-
+    
+    private static String getKafkaSaslMechanism() throws KafkaException {
+        String saslMechanism = System.getenv("SASL_MECHANISM");
+        if(saslMechanism != null) {
+            return saslMechanism;
+        } else {
+            return DEFAULT_SASL_MECHANISM;
+        }
+    }
+    
     /**
      * @param message A message to Send
      * @param topicName The name of the topic to publish to