+ @Override
+ public String getKafkaSaslJaasConfig() {
+ String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+ if(saslJaasConfFromEnv != null) {
+ return saslJaasConfFromEnv;
+ }
+ if(get(PROP_ML_DISTRIBUTION_SASL_JAAS_CONFIG) != null) {
+ return get(PROP_ML_DISTRIBUTION_SASL_JAAS_CONFIG);
+ }
+ return null;
+ }
+
+ @Override
+ public String getKafkaSaslMechanism() {
+ if(get(PROP_ML_DISTRIBUTION_SASL_MECHANISM) != null) {
+ return get(PROP_ML_DISTRIBUTION_SASL_MECHANISM);
+ }
+ return System.getenv().getOrDefault("SASL_MECHANISM", "SCRAM-SHA-512");
+ }
+
+ /**
+ * One of PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
+ */
+ @Override
+ public String getKafkaSecurityProtocolConfig() {
+ if(get(PROP_ML_DISTRIBUTION_SECURITY_PROTOCOL) != null) {
+ return get(PROP_ML_DISTRIBUTION_SECURITY_PROTOCOL);
+ }
+ return System.getenv().getOrDefault("SECURITY_PROTOCOL", "SASL_PLAINTEXT");
+ }
+