/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth"
const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth"
+ const val TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH = "kafka-scram-plain-text-auth"
}
}
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
}
}
+/** (SASL) SCRAM Plaintext Auth */
+class KafkaScramPlainTextAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
+
/** Consumer */
abstract class MessageConsumerProperties : CommonProperties()
/** Kafka Streams */
return configProps
}
}
+
+/** (SASL) SCRAM Plaintext Auth */
+class KafkaScramPlaintextAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlainTextAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlaintextAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
prefix, KafkaScramSslAuthMessageProducerProperties::class.java
)
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramPlainTextAuthMessageProducerProperties::class.java
+ )
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramPlainTextAuthMessageProducerProperties::class.java)!!
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
)
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramPlaintextAuthMessageConsumerProperties::class.java
+ )
+ }
+
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
)
}
+
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramPlaintextAuthMessageConsumerProperties::class.java)!!
+ }
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
}
+
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
meterRegistry
)
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaScramPlaintextAuthMessageConsumerProperties,
+ meterRegistry
+ )
+ }
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
return KafkaStreamsConsumerService(
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
"blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
"blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample2.type=kafka-scram-plain-text-auth",
+ "blueprintsprocessor.messageconsumer.sample2.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.sample2.groupId=sample-group",
+ "blueprintsprocessor.messageconsumer.sample2.topic=default-topic",
+ "blueprintsprocessor.messageconsumer.sample2.clientId=default-client-id",
+ "blueprintsprocessor.messageconsumer.sample2.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample2.pollRecords=1",
+ "blueprintsprocessor.messageconsumer.sample2.scramUsername=sample-user",
+ "blueprintsprocessor.messageconsumer.sample2.scramPassword=secretpassword",
+
"blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
}
}
+
+ @Test
+ fun testKafkaScramPlaintextAuthConfig() {
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
+ )
+
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample2")
+
+ val configProps = messageConsumerProperties.getConfig()
+
+ assertEquals(
+ messageConsumerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(
+ messageConsumerProperties.type,
+ "kafka-scram-plain-text-auth",
+ "Authentication type doesn't match the expected value"
+ )
+
+ assertTrue(
+ configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+ "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+ )
+ assertTrue(
+ configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+ "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+ )
+
+ expectedConfig.forEach {
+ assertTrue(
+ configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(
+ configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
fun testKafkaIntegration() {
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2021 Bell Canada.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants.Companion.PROPERTY_MESSAGE_PRODUCER_PREFIX
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
"blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
"blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
"blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
- "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
+
+ "blueprintsprocessor.messageproducer.sample2.type=kafka-scram-plain-text-auth",
+ "blueprintsprocessor.messageproducer.sample2.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample2.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample2.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample2.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample2.scramPassword=secretpassword"
]
)
+
open class BlueprintMessageProducerServiceTest {
@Autowired
)
}
}
+
+ @Test
+ fun testKafkaScramPlaintextAuthConfig() {
+
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+ ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
+ ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
+ )
+
+ val messageProducerProperties = bluePrintMessageLibPropertyService
+ .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample2")
+
+ val configProps = messageProducerProperties.getConfig()
+
+ assertEquals(
+ messageProducerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(
+ messageProducerProperties.type,
+ "kafka-scram-plain-text-auth",
+ "Authentication type doesn't match the expected value"
+ )
+
+ assertTrue(
+ configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+ "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+ )
+ assertTrue(
+ configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+ "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+ )
+
+ expectedConfig.forEach {
+ assertTrue(
+ configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(
+ configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
}