From 3405456c46937352863ce19c39266a51dd7760db Mon Sep 17 00:00:00 2001 From: efiacor Date: Fri, 21 Apr 2023 15:42:02 +0100 Subject: [PATCH] [SDC-BE] Add kafka ssl config Signed-off-by: efiacor Change-Id: I9e7c0e44566c46bd6225397a680015bf1c0f1c0b Issue-ID: SDC-4476 --- catalog-be/pom.xml | 7 +- .../sdc/be/components/kafka/KafkaCommonConfig.java | 91 ++++++++++++++++++++++ .../sdc/be/components/kafka/KafkaHandler.java | 1 - .../sdc/be/components/kafka/SdcKafkaConsumer.java | 36 +-------- .../sdc/be/components/kafka/SdcKafkaProducer.java | 32 +------- .../sdc/be/components/kafka/KafkaHandlerTest.java | 20 ++--- .../be/components/kafka/SdcKafkaConsumerTest.java | 21 +++-- .../be/components/kafka/SdcKafkaProducerTest.java | 17 ++-- pom.xml | 2 +- 9 files changed, 130 insertions(+), 97 deletions(-) create mode 100644 catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java diff --git a/catalog-be/pom.xml b/catalog-be/pom.xml index 567bc5fb43..9f5348acd7 100644 --- a/catalog-be/pom.xml +++ b/catalog-be/pom.xml @@ -920,7 +920,12 @@ ${mockitoJupiter.version} test - + + org.junit-pioneer + junit-pioneer + 2.0.1 + test + org.springframework spring-test diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java new file mode 100644 index 0000000000..06c36e4add --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java @@ -0,0 +1,91 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2023 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaCommonConfig { + + private static final Logger log = LoggerFactory.getLogger(KafkaCommonConfig.class.getName()); + + private final DistributionEngineConfiguration deConfiguration; + + public KafkaCommonConfig(DistributionEngineConfiguration config){ + this.deConfiguration = config; + } + + public Properties getConsumerProperties(){ + Properties props = new Properties(); + setCommonProperties(props); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-consumer-" + UUID.randomUUID()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + return props; + } + + public Properties getProducerProperties(){ + Properties props = new Properties(); + setCommonProperties(props); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); + + return props; + } + + private void setCommonProperties(Properties props) { + String securityProtocolConfig = System.getenv().getOrDefault("SECURITY_PROTOCOL", "SASL_PLAINTEXT"); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocolConfig); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + + if("SSL".equals(securityProtocolConfig)) { + log.error("Kafka over SSL has not been implemented yet"); + } + else{ + props.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); + props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + } + } + + /** + * @return The Sasl Jaas Configuration + */ + private String getKafkaSaslJaasConfig() throws KafkaException { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java index 2a5590e72d..5a3698055e 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java @@ -22,7 +22,6 @@ package org.openecomp.sdc.be.components.kafka; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import fj.data.Either; -import lombok.Getter; import lombok.Setter; import org.apache.http.HttpStatus; import org.apache.kafka.common.KafkaException; diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java index 8879bf000e..5350445ab1 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,14 +25,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.common.log.wrappers.Logger; @@ -43,27 +39,16 @@ public class SdcKafkaConsumer { private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); private final DistributionEngineConfiguration deConfiguration; - private KafkaConsumer kafkaConsumer; + private final KafkaConsumer kafkaConsumer; /** * Constructor setting up the KafkaConsumer from a predefined set of configurations */ public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){ log.info("Create SdcKafkaConsumer via constructor"); - Properties properties = new Properties(); + KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(deConfiguration); + Properties properties = kafkaCommonConfig.getConsumerProperties(); this.deConfiguration = deConfiguration; - - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID()); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); - - properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); kafkaConsumer = new KafkaConsumer<>(properties); } @@ -78,19 +63,6 @@ public class SdcKafkaConsumer { this.kafkaConsumer = kafkaConsumer; } - /** - * - * @return the Sasl Jass Config - */ - private String getKafkaSaslJaasConfig() { - String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); - if(saslJaasConfFromEnv != null) { - return saslJaasConfFromEnv; - } else { - throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); - } - } - /** * * @param topic Topic in which to subscribe diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java index bdc984d7b5..9e31da66b9 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,9 @@ package org.openecomp.sdc.be.components.kafka; import com.google.common.annotations.VisibleForTesting; import java.util.Properties; -import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,22 +34,15 @@ import org.slf4j.LoggerFactory; public class SdcKafkaProducer { private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); - private KafkaProducer kafkaProducer; + private final KafkaProducer kafkaProducer; /** * Constructor setting up the KafkaProducer from a predefined set of configurations */ public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) { log.info("Create SdcKafkaProducer via constructor"); - Properties properties = new Properties(); - - properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(deConfiguration); + Properties properties = kafkaCommonConfig.getProducerProperties(); kafkaProducer = new KafkaProducer<>(properties); } @@ -66,22 +55,9 @@ public class SdcKafkaProducer { this.kafkaProducer = kafkaProducer; } - /** - * @return The Sasl Jaas Configuration - */ - private static String getKafkaSaslJaasConfig() throws KafkaException { - String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); - if(saslJaasConfFromEnv != null) { - return saslJaasConfFromEnv; - } else { - throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); - } - } - /** * @param message A message to Send * @param topicName The name of the topic to publish to - * @return The status of the send request */ public void send(String message, String topicName) throws KafkaException { ProducerRecord kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message); diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java index 91ee0235ad..de7d8bfe03 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,29 +21,25 @@ package org.openecomp.sdc.be.components.kafka; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; import com.google.gson.JsonSyntaxException; +import fj.data.Either; +import java.util.ArrayList; +import java.util.List; import org.apache.kafka.common.KafkaException; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; - -import java.util.ArrayList; -import fj.data.Either; -import java.util.List; - +import org.mockito.junit.jupiter.MockitoExtension; import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse; -import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl; import org.openecomp.sdc.be.components.distribution.engine.INotificationData; +import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl; import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; - @ExtendWith(MockitoExtension.class) public class KafkaHandlerTest { @@ -53,8 +49,6 @@ public class KafkaHandlerTest { @Mock private SdcKafkaProducer mockSdcKafkaProducer; - private KafkaHandler kafkaHandler; - @Test public void testIsKafkaActiveTrue(){ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java index 0a4a834fa4..8db9a3251b 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,20 +21,14 @@ package org.openecomp.sdc.be.components.kafka; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; - -import org.apache.kafka.common.KafkaException; -import org.junit.jupiter.api.Test; - import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import java.util.Collections; -import java.util.Collection; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,11 +37,16 @@ import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.jetbrains.annotations.NotNull; - +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetEnvironmentVariable; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;") public class SdcKafkaConsumerTest { @Test diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java index 23322cce5a..42646027fc 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,24 +19,21 @@ */ package org.openecomp.sdc.be.components.kafka; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import org.junit.jupiter.api.Test; - import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; - -import org.openecomp.sdc.be.catalog.api.IStatus; +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetEnvironmentVariable; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;") public class SdcKafkaProducerTest { @Test diff --git a/pom.xml b/pom.xml index 9872be4c15..24ebf53a40 100644 --- a/pom.xml +++ b/pom.xml @@ -212,7 +212,7 @@ Modifications copyright (c) 2018-2019 Nokia org.apache.kafka kafka-clients - 3.3.1 + 3.4.0 com.google.guava -- 2.16.6