X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=catalog-be%2Fsrc%2Fmain%2Fjava%2Forg%2Fopenecomp%2Fsdc%2Fbe%2Fcomponents%2Fkafka%2FSdcKafkaProducer.java;h=9e31da66b94c69602dc5993b9e4a2a4dc0f5168c;hb=3405456c46937352863ce19c39266a51dd7760db;hp=bdc984d7b586b47284219f89e53b4d3bea5deccc;hpb=5d3954987dd1d53d2e9623648b6d436592a4c195;p=sdc.git diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java index bdc984d7b5..9e31da66b9 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,9 @@ package org.openecomp.sdc.be.components.kafka; import com.google.common.annotations.VisibleForTesting; import java.util.Properties; -import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,22 +34,15 @@ import org.slf4j.LoggerFactory; public class SdcKafkaProducer { private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); - private KafkaProducer kafkaProducer; + private final KafkaProducer kafkaProducer; /** * Constructor setting up the KafkaProducer from a predefined set of configurations */ public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) { log.info("Create SdcKafkaProducer via constructor"); - Properties properties = new Properties(); - - properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(deConfiguration); + Properties properties = kafkaCommonConfig.getProducerProperties(); kafkaProducer = new KafkaProducer<>(properties); } @@ -66,22 +55,9 @@ public class SdcKafkaProducer { this.kafkaProducer = kafkaProducer; } - /** - * @return The Sasl Jaas Configuration - */ - private static String getKafkaSaslJaasConfig() throws KafkaException { - String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); - if(saslJaasConfFromEnv != null) { - return saslJaasConfFromEnv; - } else { - throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); - } - } - /** * @param message A message to Send * @param topicName The name of the topic to publish to - * @return The status of the send request */ public void send(String message, String topicName) throws KafkaException { ProducerRecord kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message);