X-Git-Url: https://gerrit.onap.org/r/gitweb?p=sdc.git;a=blobdiff_plain;f=catalog-be%2Fsrc%2Fmain%2Fjava%2Forg%2Fopenecomp%2Fsdc%2Fbe%2Fcomponents%2Fkafka%2FSdcKafkaProducer.java;fp=catalog-be%2Fsrc%2Fmain%2Fjava%2Forg%2Fopenecomp%2Fsdc%2Fbe%2Fcomponents%2Fkafka%2FSdcKafkaProducer.java;h=5db5165b55bf5832df8fea0cfc5bdb56e9a4bfeb;hp=715835779959e75f375561c850ee944c4a601ba4;hb=0ce40cecbce00104be54871ce87ca99cef2aa480;hpb=5d7ca5c1e86d7633a1954ae89334df18d264f82b diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java index 7158357799..5db5165b55 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -21,13 +21,9 @@ package org.openecomp.sdc.be.components.kafka; import com.google.common.annotations.VisibleForTesting; import java.util.Properties; -import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,24 +33,16 @@ import org.slf4j.LoggerFactory; */ public class SdcKafkaProducer { private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); - private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512"; - private KafkaProducer kafkaProducer; + private final KafkaProducer kafkaProducer; /** * Constructor setting up the KafkaProducer from a predefined set of configurations */ public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) { log.info("Create SdcKafkaProducer via constructor"); - Properties properties = new Properties(); - - properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); - properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism()); + KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(deConfiguration); + Properties properties = kafkaCommonConfig.getProducerProperties(); kafkaProducer = new KafkaProducer<>(properties); } @@ -67,31 +55,9 @@ public class SdcKafkaProducer { this.kafkaProducer = kafkaProducer; } - /** - * @return The Sasl Jaas Configuration - */ - private static String getKafkaSaslJaasConfig() throws KafkaException { - String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); - if(saslJaasConfFromEnv != null) { - return saslJaasConfFromEnv; - } else { - throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); - } - } - - private static String getKafkaSaslMechanism() throws KafkaException { - String saslMechanism = System.getenv("SASL_MECHANISM"); - if(saslMechanism != null) { - return saslMechanism; - } else { - return DEFAULT_SASL_MECHANISM; - } - } - /** * @param message A message to Send * @param topicName The name of the topic to publish to - * @return The status of the send request */ public void send(String message, String topicName) throws KafkaException { ProducerRecord kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message);