public class SdcKafkaConsumer {
private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+ private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
+
private final DistributionEngineConfiguration deConfiguration;
private KafkaConsumer<String, String> kafkaConsumer;
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+ properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
kafkaConsumer = new KafkaConsumer<>(properties);
throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
}
}
+
+ private static String getKafkaSaslMechanism() throws KafkaException {
+ String saslMechanism = System.getenv("SASL_MECHANISM");
+ if(saslMechanism != null) {
+ return saslMechanism;
+ } else {
+ return DEFAULT_SASL_MECHANISM;
+ }
+ }
/**
*
*/
public class SdcKafkaProducer {
private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+ private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
private KafkaProducer<String, String> kafkaProducer;
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
- properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+ properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
kafkaProducer = new KafkaProducer<>(properties);
}
throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
}
}
-
+
+ private static String getKafkaSaslMechanism() throws KafkaException {
+ String saslMechanism = System.getenv("SASL_MECHANISM");
+ if(saslMechanism != null) {
+ return saslMechanism;
+ } else {
+ return DEFAULT_SASL_MECHANISM;
+ }
+ }
+
/**
* @param message A message to Send
* @param topicName The name of the topic to publish to