DMAAP-1706 - New Kafka Auth option 17/127217/4
authordavid.mcweeney <david.mcweeney@est.tech>
Thu, 17 Feb 2022 17:07:59 +0000 (17:07 +0000)
committerdavid.mcweeney <david.mcweeney@est.tech>
Wed, 2 Mar 2022 16:56:59 +0000 (16:56 +0000)
Change-Id: I8533721d23d6adb41f65cb96fb2b8f852bda47b8
Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Issue-ID: DMAAP-1706

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt

index 659295a..249af88 100644 (file)
@@ -1,6 +1,7 @@
 /*
  *  Copyright © 2019 IBM.
  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ *  Modification Copyright (C) 2022 Nordix Foundation.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -67,5 +68,6 @@ class MessageLibConstants {
         const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
         const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth"
         const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth"
+        const val TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH = "kafka-scram-plain-text-auth"
     }
 }
index 3e7db95..886c87c 100644 (file)
@@ -1,6 +1,7 @@
 /*
  *  Copyright © 2019 IBM.
  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ *  Modification Copyright (C) 2022 Nordix Foundation.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -117,6 +118,24 @@ class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerPr
     }
 }
 
+/** (SASL) SCRAM Plaintext Auth */
+class KafkaScramPlainTextAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+
+    var saslMechanism: String = "SCRAM-SHA-512"
+    lateinit var scramUsername: String
+    lateinit var scramPassword: String
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
+        configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+        configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+                "username=\"${scramUsername}\" " +
+                "password=\"${scramPassword}\";"
+        return configProps
+    }
+}
+
 /** Consumer */
 abstract class MessageConsumerProperties : CommonProperties()
 /** Kafka Streams */
@@ -265,3 +284,21 @@ class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerPr
         return configProps
     }
 }
+
+/** (SASL) SCRAM Plaintext Auth */
+class KafkaScramPlaintextAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+
+    var saslMechanism: String = "SCRAM-SHA-512"
+    lateinit var scramUsername: String
+    lateinit var scramPassword: String
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
+        configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+        configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+                "username=\"${scramUsername}\" " +
+                "password=\"${scramPassword}\";"
+        return configProps
+    }
+}
index c0cf51b..d10f2d4 100644 (file)
@@ -1,6 +1,7 @@
 /*
  *  Copyright © 2019 IBM.
  *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
+ *  Modification Copyright (C) 2022 Nordix Foundation.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -22,6 +23,8 @@ import io.micrometer.core.instrument.MeterRegistry
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlainTextAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlaintextAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
@@ -71,6 +74,11 @@ open class BluePrintMessageLibPropertyService(
                     prefix, KafkaScramSslAuthMessageProducerProperties::class.java
                 )
             }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                    prefix, KafkaScramPlainTextAuthMessageProducerProperties::class.java
+                )
+            }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -89,6 +97,9 @@ open class BluePrintMessageLibPropertyService(
             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
             }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaScramPlainTextAuthMessageProducerProperties::class.java)!!
+            }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -130,6 +141,12 @@ open class BluePrintMessageLibPropertyService(
                     prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
                 )
             }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                    prefix, KafkaScramPlaintextAuthMessageConsumerProperties::class.java
+                )
+            }
+
             /** Stream Consumer */
             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
                 bluePrintPropertiesService.propertyBeanType(
@@ -146,6 +163,7 @@ open class BluePrintMessageLibPropertyService(
                     prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
                 )
             }
+
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -165,6 +183,9 @@ open class BluePrintMessageLibPropertyService(
             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
             }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaScramPlaintextAuthMessageConsumerProperties::class.java)!!
+            }
             /** Stream Consumer */
             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
@@ -175,6 +196,7 @@ open class BluePrintMessageLibPropertyService(
             MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
             }
+
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -204,6 +226,12 @@ open class BluePrintMessageLibPropertyService(
                         meterRegistry
                     )
                 }
+                MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+                    return KafkaMessageConsumerService(
+                        messageConsumerProperties as KafkaScramPlaintextAuthMessageConsumerProperties,
+                        meterRegistry
+                    )
+                }
                 /** Stream Consumer */
                 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
                     return KafkaStreamsConsumerService(
index 9c37b9d..37d8b24 100644 (file)
@@ -1,6 +1,7 @@
 /*
  *  Copyright © 2019 IBM.
  *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
+ *  Modification Copyright (C) 2022 Nordix Foundation.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -83,6 +84,16 @@ import kotlin.test.assertTrue
             "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
             "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
 
+            "blueprintsprocessor.messageconsumer.sample2.type=kafka-scram-plain-text-auth",
+            "blueprintsprocessor.messageconsumer.sample2.bootstrapServers=127.0.0.1:9092",
+            "blueprintsprocessor.messageconsumer.sample2.groupId=sample-group",
+            "blueprintsprocessor.messageconsumer.sample2.topic=default-topic",
+            "blueprintsprocessor.messageconsumer.sample2.clientId=default-client-id",
+            "blueprintsprocessor.messageconsumer.sample2.pollMillSec=10",
+            "blueprintsprocessor.messageconsumer.sample2.pollRecords=1",
+            "blueprintsprocessor.messageconsumer.sample2.scramUsername=sample-user",
+            "blueprintsprocessor.messageconsumer.sample2.scramPassword=secretpassword",
+
             "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
             "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
             "blueprintsprocessor.messageproducer.sample.topic=default-topic",
@@ -271,6 +282,57 @@ open class BlueprintMessageConsumerServiceTest {
         }
     }
 
+
+    @Test
+    fun testKafkaScramPlaintextAuthConfig() {
+        val expectedConfig = mapOf<String, Any>(
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+            ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+            CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
+        )
+
+        val messageConsumerProperties = bluePrintMessageLibPropertyService
+            .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample2")
+
+        val configProps = messageConsumerProperties.getConfig()
+
+        assertEquals(
+            messageConsumerProperties.topic,
+            "default-topic",
+            "Topic doesn't match the expected value"
+        )
+        assertEquals(
+            messageConsumerProperties.type,
+            "kafka-scram-plain-text-auth",
+            "Authentication type doesn't match the expected value"
+        )
+
+        assertTrue(
+            configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+            "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+        )
+        assertTrue(
+            configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+            "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+        )
+
+        expectedConfig.forEach {
+            assertTrue(
+                configProps.containsKey(it.key),
+                "Missing expected kafka config key : ${it.key}"
+            )
+            assertEquals(
+                configProps[it.key],
+                it.value,
+                "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+            )
+        }
+    }
+
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
     // @Test
     fun testKafkaIntegration() {
index 87819f6..bb35b66 100644 (file)
@@ -1,6 +1,7 @@
 /*
  *  Copyright © 2019 IBM.
  *  Modifications Copyright © 2021 Bell Canada.
+ *  Modification Copyright (C) 2022 Nordix Foundation.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -38,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants.Companion.PROPERTY_MESSAGE_PRODUCER_PREFIX
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.mock.mockito.MockBean
 import org.springframework.test.annotation.DirtiesContext
@@ -69,9 +71,17 @@ import kotlin.test.assertTrue
             "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
             "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
             "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
-            "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
+            "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
+
+            "blueprintsprocessor.messageproducer.sample2.type=kafka-scram-plain-text-auth",
+            "blueprintsprocessor.messageproducer.sample2.bootstrapServers=127.0.0.1:9092",
+            "blueprintsprocessor.messageproducer.sample2.topic=default-topic",
+            "blueprintsprocessor.messageproducer.sample2.clientId=default-client-id",
+            "blueprintsprocessor.messageproducer.sample2.scramUsername=sample-user",
+            "blueprintsprocessor.messageproducer.sample2.scramPassword=secretpassword"
         ]
 )
+
 open class BlueprintMessageProducerServiceTest {
 
     @Autowired
@@ -163,4 +173,56 @@ open class BlueprintMessageProducerServiceTest {
             )
         }
     }
+
+    @Test
+    fun testKafkaScramPlaintextAuthConfig() {
+
+        val expectedConfig = mapOf<String, Any>(
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+            ProducerConfig.ACKS_CONFIG to "all",
+            ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
+            ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
+            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+            CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
+        )
+
+        val messageProducerProperties = bluePrintMessageLibPropertyService
+            .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample2")
+
+        val configProps = messageProducerProperties.getConfig()
+
+        assertEquals(
+            messageProducerProperties.topic,
+            "default-topic",
+            "Topic doesn't match the expected value"
+        )
+        assertEquals(
+            messageProducerProperties.type,
+            "kafka-scram-plain-text-auth",
+            "Authentication type doesn't match the expected value"
+        )
+
+        assertTrue(
+            configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+            "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+        )
+        assertTrue(
+            configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+            "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+        )
+
+        expectedConfig.forEach {
+            assertTrue(
+                configProps.containsKey(it.key),
+                "Missing expected kafka config key : ${it.key}"
+            )
+            assertEquals(
+                configProps[it.key],
+                it.value,
+                "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+            )
+        }
+    }
 }