public class SdcKafkaConsumer {
 
     private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+    private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
+
     private final DistributionEngineConfiguration deConfiguration;
     private KafkaConsumer<String, String> kafkaConsumer;
 
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
 
         properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
         kafkaConsumer = new KafkaConsumer<>(properties);
             throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
         }
     }
+    
+    private static String getKafkaSaslMechanism() throws KafkaException {
+        String saslMechanism = System.getenv("SASL_MECHANISM");
+        if(saslMechanism != null) {
+            return saslMechanism;
+        } else {
+            return DEFAULT_SASL_MECHANISM;
+        }
+    }
 
     /**
      *
 
  */
 public class SdcKafkaProducer {
     private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+    private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
 
     private KafkaProducer<String, String> kafkaProducer;
 
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
-        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
         kafkaProducer = new KafkaProducer<>(properties);
     }
 
             throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
         }
     }
-
+    
+    private static String getKafkaSaslMechanism() throws KafkaException {
+        String saslMechanism = System.getenv("SASL_MECHANISM");
+        if(saslMechanism != null) {
+            return saslMechanism;
+        } else {
+            return DEFAULT_SASL_MECHANISM;
+        }
+    }
+    
     /**
      * @param message A message to Send
      * @param topicName The name of the topic to publish to