Secure Kafka Authentication 73/105873/5
authorJulien Fontaine <julien.fontaine@bell.ca>
Mon, 13 Apr 2020 20:42:36 +0000 (16:42 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Tue, 21 Apr 2020 17:57:31 +0000 (13:57 -0400)
Implementation of kafka secure authentication :
- SSL
- SASL(SCRAM) & SSL

Issue-ID: CCSDK-2313
Change-Id: I4b2fc7abab7478e360ebf461608a620d75708f54
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
17 files changed:
ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
ms/blueprintsprocessor/application/src/main/resources/application.properties
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt [moved from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt with 78% similarity]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt [moved from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt with 63% similarity]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt [moved from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt with 73% similarity]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt [moved from ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt with 87% similarity]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jks [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jks [new file with mode: 0644]

index ad38883..bf5e23b 100755 (executable)
@@ -131,19 +131,31 @@ blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
 blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
 blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
 blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+#### Security settings
+#### SSL
+#blueprintsprocessor.messageconsumer.self-service-api.truststore=/path/to/truststore.jks
+#blueprintsprocessor.messageconsumer.self-service-api.truststorePassword=truststorePassword
+#blueprintsprocessor.messageconsumer.self-service-api.keystore=/path/to/keystore.jks
+#blueprintsprocessor.messageconsumer.self-service-api.keystorePassword=keystorePassword
+#### SCRAM
+#blueprintsprocessor.messageconsumer.self-service-api.scramUsername=test-user
+#blueprintsprocessor.messageconsumer.self-service-api.scramPassword=testUserPassword
 
 # Kafka audit service Configurations
+## Audit request
 blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
 blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
 blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
 blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
 blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
 
+## Audit response
 blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
 blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
 blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
 blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
 
+
 # Message prioritization kakfa properties, Enable if Prioritization service is needed
 # Deploy message-prioritization function along with blueprintsprocessor application.
 #blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth
index 74549b0..6fb737e 100755 (executable)
@@ -103,20 +103,35 @@ blueprintsprocessor.restclient.aai-data.additionalHeaders.X-TransactionId=cds-tr
 blueprintsprocessor.restclient.aai-data.additionalHeaders.X-FromAppId=cds-app-id
 blueprintsprocessor.restclient.aai-data.additionalHeaders.Accept=application/json
 
-# Kafka-message-lib Configuration
-blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
-blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
-blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
-
-blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
-
+# Kafka audit service Configurations
+## Audit request
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+#### Security settings
+#### SSL
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.truststore=/path/to/truststore.jks
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.truststorePassword=truststorePassword
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.keystore=/path/to/keystore.jks
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.keystorePassword=keystorePassword
+#### SCRAM
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.scramUsername=test-user
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.scramPassword=testUserPassword
+
+## Audit response
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
+
+# Message prioritization kakfa properties, Enable if Prioritization service is needed
+# Deploy message-prioritization function along with blueprintsprocessor application.
+#blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth
+#blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092
+#blueprintsprocessor.messageconsumer.prioritize-input.applicationId=cds-controller
+#blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic
 
 blueprintprocessor.remoteScriptCommand.enabled=true
 
index 35bc494..7e6bf68 100644 (file)
@@ -38,7 +38,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.s
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
@@ -72,15 +72,27 @@ import kotlin.test.assertNotNull
         "spring.jpa.properties.hibernate.show_sql=false",
         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
 
-        "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+        "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
         "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
         "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
         "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+        "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
+        "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
+        "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
+        "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
+        "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
+        "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
 
         // To send initial test message
-        "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+        "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
         "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+        "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
+        "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
+        "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
+        "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
+        "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
+        "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
 
         "blueprintsprocessor.nats.cds-controller.type=token-auth",
         "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
@@ -241,7 +253,7 @@ open class MessagePrioritizationConsumerTest {
 
             /** Send sample message with every 1 sec */
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
-                .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+                .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
             launch {
                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
                     delay(100)
index cc4c7fa..c6587c7 100644 (file)
@@ -61,6 +61,10 @@ class MessageLibConstants {
         const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
         const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
         const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
+        const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth"
+        const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth"
         const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
+        const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth"
+        const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth"
     }
 }
index 005223d..ac35fbf 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message
 
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.streams.StreamsConfig
 
-/** Producer Properties **/
-open class MessageProducerProperties {
+/** Common Properties **/
+abstract class CommonProperties {
     lateinit var type: String
+    lateinit var topic: String
+    lateinit var bootstrapServers: String
+
+    open fun getConfig(): HashMap<String, Any> {
+        val configProps = hashMapOf<String, Any>()
+        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+        return configProps
+    }
 }
 
+/** Message Producer */
+/** Message Producer Properties **/
+abstract class MessageProducerProperties : CommonProperties()
+
+/** Basic Auth */
 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
-    lateinit var bootstrapServers: String
-    var topic: String? = null
+
     var clientId: String? = null
     // strongest producing guarantee
     var acks: String = "all"
     var retries: Int = 0
     // ensure we don't push duplicates
     var enableIdempotence: Boolean = true
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
+        configProps[ProducerConfig.ACKS_CONFIG] = acks
+        configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
+        if (clientId != null) {
+            configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
+        }
+        return configProps
+    }
 }
 
-/** Consumer Properties **/
+/** SSL Auth */
+open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+    lateinit var truststore: String
+    lateinit var truststorePassword: String
+    var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+    var keystore: String? = null
+    var keystorePassword: String? = null
+    var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+    var sslEndpointIdentificationAlgorithm: String = ""
 
-open class MessageConsumerProperties {
-    lateinit var type: String
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+        configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+        if (keystore != null) {
+            configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+            configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+            configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+        }
+        configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+
+        return configProps
+    }
 }
 
-open class KafkaStreamsConsumerProperties : MessageConsumerProperties() {
-    lateinit var bootstrapServers: String
+/** (SASL) SCRAM SSL Auth */
+class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
+    var saslMechanism: String = "SCRAM-SHA-512"
+    lateinit var scramUsername: String
+    lateinit var scramPassword: String
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+        configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+        configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+                "username=\"${scramUsername}\" " +
+                "password=\"${scramPassword}\";"
+        return configProps
+    }
+}
+
+/** Consumer */
+abstract class MessageConsumerProperties : CommonProperties()
+/** Kafka Streams */
+/** Streams properties */
+
+/** Basic Auth */
+open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
     lateinit var applicationId: String
-    lateinit var topic: String
     var autoOffsetReset: String = "latest"
     var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProperties = super.getConfig()
+        configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
+        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
+        configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
+        return configProperties
+    }
 }
 
-open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties()
+/** SSL Auth */
+open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
+    lateinit var truststore: String
+    lateinit var truststorePassword: String
+    var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+    var keystore: String? = null
+    var keystorePassword: String? = null
+    var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+    var sslEndpointIdentificationAlgorithm: String = ""
 
-open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
-    lateinit var bootstrapServers: String
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+        configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+        if (keystore != null) {
+            configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+            configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+            configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+        }
+        configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+        return configProps
+    }
+}
+
+/** (SASL) SCRAM SSL Auth */
+class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
+    var saslMechanism: String = "SCRAM-SHA-512"
+    lateinit var scramUsername: String
+    lateinit var scramPassword: String
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+        configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+        configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+                "username=\"${scramUsername}\" " +
+                "password=\"${scramPassword}\";"
+        return configProps
+    }
+}
+
+/** Message Consumer */
+/** Message Consumer Properties **/
+/** Basic Auth */
+open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
     lateinit var groupId: String
     lateinit var clientId: String
-    var topic: String? = null
     var autoCommit: Boolean = true
     var autoOffsetReset: String = "latest"
     var pollMillSec: Long = 1000
     var pollRecords: Int = -1
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProperties = super.getConfig()
+        configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+        configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
+        /**
+         * earliest: automatically reset the offset to the earliest offset
+         * latest: automatically reset the offset to the latest offset
+         */
+        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
+        configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
+        configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
+
+        /** To handle Back pressure, Get only configured record for processing */
+        if (pollRecords > 0) {
+            configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
+        }
+
+        return configProperties
+    }
 }
 
-open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
+/** SSL Auth */
+open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+    lateinit var truststore: String
+    lateinit var truststorePassword: String
+    var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+    var keystore: String? = null
+    var keystorePassword: String? = null
+    var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+    var sslEndpointIdentificationAlgorithm: String = ""
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+        configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+        if (keystore != null) {
+            configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+            configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+            configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+        }
+        configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+        return configProps
+    }
+}
+
+/** (SASL) SCRAM SSL Auth */
+class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
+    var saslMechanism: String = "SCRAM-SHA-512"
+    lateinit var scramUsername: String
+    lateinit var scramPassword: String
+
+    override fun getConfig(): HashMap<String, Any> {
+        val configProps = super.getConfig()
+        configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+        configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+        configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+                "username=\"${scramUsername}\" " +
+                "password=\"${scramPassword}\";"
+        return configProps
+    }
+}
index 8803946..c659fdb 100644 (file)
@@ -99,6 +99,20 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri
             BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block)
         )
     }
+
+    fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
+        property(
+                BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+                BluePrintTypes.kafkaSslAuthMessageProducerProperties(block)
+        )
+    }
+
+    fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
+        property(
+                BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+                BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block)
+        )
+    }
 }
 
 fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
@@ -108,9 +122,23 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth
     return assignments.asJsonType()
 }
 
+fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+    val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
+    assignments[KafkaSslAuthMessageProducerProperties::type.name] =
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+    return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+    val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
+    assignments[KafkaScramSslAuthMessageProducerProperties::type.name] =
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+    return assignments.asJsonType()
+}
+
 open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
 
-class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() {
+open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() {
 
     fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
 
@@ -141,6 +169,61 @@ class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducer
         property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence)
 }
 
+open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() {
+    fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+    fun truststore(truststore: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::truststore, truststore)
+
+    fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+    fun truststorePassword(truststorePassword: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword)
+
+    fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+    fun truststoreType(truststoreType: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType)
+
+    fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+    fun keystore(keystore: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+    fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+    fun keystorePassword(keystorePassword: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+    fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+    fun keystoreType(keystoreType: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+    fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+            sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+    fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() {
+    fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+    fun saslMechanism(saslMechanism: JsonNode) =
+            property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism)
+
+    fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+    fun scramUsername(scramUsername: JsonNode) =
+            property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername)
+
+    fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+    fun scramPassword(scramPassword: JsonNode) =
+            property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword)
+}
+
 /** Relationships Templates DSL for Message Consumer */
 fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer(
     name: String,
@@ -166,12 +249,40 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri
         )
     }
 
+    fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
+        property(
+                BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+                BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block)
+        )
+    }
+
+    fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
+        property(
+                BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+                BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block)
+        )
+    }
+
     fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
         property(
             BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
             BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block)
         )
     }
+
+    fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
+        property(
+                BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+                BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block)
+        )
+    }
+
+    fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
+        property(
+                BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+                BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block)
+        )
+    }
 }
 
 fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
@@ -181,6 +292,20 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth
     return assignments.asJsonType()
 }
 
+fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+    val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
+    assignments[KafkaSslAuthMessageConsumerProperties::type.name] =
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+    return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+    val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
+    assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] =
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+    return assignments.asJsonType()
+}
+
 fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
     val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
     assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] =
@@ -188,81 +313,201 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa
     return assignments.asJsonType()
 }
 
+fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+    val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
+    assignments[KafkaStreamsSslAuthConsumerProperties::type.name] =
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive()
+    return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+    val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
+    assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] =
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive()
+    return assignments.asJsonType()
+}
+
 open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
 
-open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+/** KafkaBasicAuthMessageConsumerProperties assignment builder */
+open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
 
     fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
 
     fun bootstrapServers(bootstrapServers: JsonNode) =
-        property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers)
+        property(KafkaBasicAuthMessageConsumerProperties::bootstrapServers, bootstrapServers)
 
     fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive())
 
     fun groupId(groupId: JsonNode) =
-        property(KafkaMessageConsumerProperties::groupId, groupId)
+        property(KafkaBasicAuthMessageConsumerProperties::groupId, groupId)
 
     fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive())
 
     fun clientId(clientId: JsonNode) =
-        property(KafkaMessageConsumerProperties::clientId, clientId)
+        property(KafkaBasicAuthMessageConsumerProperties::clientId, clientId)
 
     fun topic(topic: String) = topic(topic.asJsonPrimitive())
 
     fun topic(topic: JsonNode) =
-        property(KafkaMessageConsumerProperties::topic, topic)
+        property(KafkaBasicAuthMessageConsumerProperties::topic, topic)
 
     fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive())
 
     fun autoCommit(autoCommit: JsonNode) =
-        property(KafkaMessageConsumerProperties::autoCommit, autoCommit)
+        property(KafkaBasicAuthMessageConsumerProperties::autoCommit, autoCommit)
 
     fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
 
     fun autoOffsetReset(autoOffsetReset: JsonNode) =
-        property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset)
+        property(KafkaBasicAuthMessageConsumerProperties::autoOffsetReset, autoOffsetReset)
 
     fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive())
 
     fun pollMillSec(pollMillSec: JsonNode) =
-        property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec)
+        property(KafkaBasicAuthMessageConsumerProperties::pollMillSec, pollMillSec)
 
     fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive())
 
     fun pollRecords(pollRecords: JsonNode) =
-        property(KafkaMessageConsumerProperties::pollRecords, pollRecords)
+        property(KafkaBasicAuthMessageConsumerProperties::pollRecords, pollRecords)
 }
 
-/** KafkaBasicAuthMessageConsumerProperties assignment builder */
-class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder()
+open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() {
+    fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+    fun truststore(truststore: JsonNode) =
+            property(KafkaSslAuthMessageConsumerProperties::truststore, truststore)
+
+    fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+    fun truststorePassword(truststorePassword: JsonNode) =
+            property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword)
+
+    fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+    fun truststoreType(truststoreType: JsonNode) =
+            property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType)
+
+    fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+    fun keystore(keystore: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+    fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+    fun keystorePassword(keystorePassword: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+    fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+    fun keystoreType(keystoreType: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+    fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+            sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+    fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+            property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() {
+    fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+    fun saslMechanism(saslMechanism: JsonNode) =
+            property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism)
+
+    fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+    fun scramUsername(scramUsername: JsonNode) =
+            property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername)
+
+    fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+    fun scramPassword(scramPassword: JsonNode) =
+            property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword)
+}
 
 /** KafkaStreamsConsumerProperties assignment builder */
-open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
 
     fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
 
     fun bootstrapServers(bootstrapServers: JsonNode) =
-        property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers)
+        property(KafkaStreamsBasicAuthConsumerProperties::bootstrapServers, bootstrapServers)
 
     fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive())
 
     fun applicationId(applicationId: JsonNode) =
-        property(KafkaStreamsConsumerProperties::applicationId, applicationId)
+        property(KafkaStreamsBasicAuthConsumerProperties::applicationId, applicationId)
 
     fun topic(topic: String) = topic(topic.asJsonPrimitive())
 
     fun topic(topic: JsonNode) =
-        property(KafkaStreamsConsumerProperties::topic, topic)
+        property(KafkaStreamsBasicAuthConsumerProperties::topic, topic)
 
     fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
 
     fun autoOffsetReset(autoOffsetReset: JsonNode) =
-        property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset)
+        property(KafkaStreamsBasicAuthConsumerProperties::autoOffsetReset, autoOffsetReset)
 
     fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive())
 
     fun processingGuarantee(processingGuarantee: JsonNode) =
-        property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee)
+        property(KafkaStreamsBasicAuthConsumerProperties::processingGuarantee, processingGuarantee)
 }
 
-class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder()
+open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() {
+    fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+    fun truststore(truststore: JsonNode) =
+            property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore)
+
+    fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+    fun truststorePassword(truststorePassword: JsonNode) =
+            property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword)
+
+    fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+    fun truststoreType(truststoreType: JsonNode) =
+            property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType)
+
+    fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+    fun keystore(keystore: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+    fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+    fun keystorePassword(keystorePassword: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+    fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+    fun keystoreType(keystoreType: JsonNode) =
+            property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+    fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+            sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+    fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+            property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() {
+    fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+    fun saslMechanism(saslMechanism: JsonNode) =
+            property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism)
+
+    fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+    fun scramUsername(scramUsername: JsonNode) =
+            property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername)
+
+    fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+    fun scramPassword(scramPassword: JsonNode) =
+            property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword)
+}
index 44b50af..67fbef5 100644 (file)
@@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.JsonNode
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
@@ -34,20 +40,32 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
 
     fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
         val messageClientProperties = messageProducerProperties(jsonNode)
-        return blueprintMessageProducerService(messageClientProperties)
+        return KafkaMessageProducerService(messageClientProperties)
     }
 
     fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
         val messageClientProperties = messageProducerProperties(prefix)
-        return blueprintMessageProducerService(messageClientProperties)
+        return KafkaMessageProducerService(messageClientProperties)
     }
 
     fun messageProducerProperties(prefix: String): MessageProducerProperties {
         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
         return when (type) {
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
-                kafkaBasicAuthMessageProducerProperties(prefix)
+                bluePrintPropertiesService.propertyBeanType(
+                    prefix, KafkaBasicAuthMessageProducerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaSslAuthMessageProducerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaScramSslAuthMessageProducerProperties::class.java
+                )
             }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -61,31 +79,18 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
             }
-            else -> {
-                throw BluePrintProcessorException("Message adaptor($type) is not supported")
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
             }
-        }
-    }
-
-    private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties):
-            BlueprintMessageProducerService {
-
-        when (MessageProducerProperties) {
-            is KafkaBasicAuthMessageProducerProperties -> {
-                return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
             }
             else -> {
-                throw BluePrintProcessorException("couldn't get Message client service for")
+                throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
         }
     }
 
-    private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
-        return bluePrintPropertiesService.propertyBeanType(
-            prefix, KafkaBasicAuthMessageProducerProperties::class.java
-        )
-    }
-
     /** Consumer Property Lib Service Implementation **/
 
     /** Return Message Consumer Service for [jsonNode] definitions. */
@@ -105,11 +110,37 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
     fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
         return when (type) {
+            /** Message Consumer */
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
-                kafkaBasicAuthMessageConsumerProperties(prefix)
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaSslAuthMessageConsumerProperties::class.java
+                )
             }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
+                )
+            }
+            /** Stream Consumer */
             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
-                kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaStreamsSslAuthConsumerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
+                )
             }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -120,12 +151,26 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
     fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
         val type = jsonNode.get("type").textValue()
         return when (type) {
+            /** Message Consumer */
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
             }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
+            }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
+            }
+            /** Stream Consumer */
             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
             }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
+            }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -135,28 +180,42 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
     private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
             BlueprintMessageConsumerService {
 
-        when (messageConsumerProperties) {
-            is KafkaBasicAuthMessageConsumerProperties -> {
-                return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+        when (messageConsumerProperties.type) {
+            /** Message Consumer */
+            MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+                return KafkaMessageConsumerService(
+                        messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                return KafkaMessageConsumerService(
+                        messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                return KafkaMessageConsumerService(
+                        messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+                )
             }
-            is KafkaStreamsBasicAuthConsumerProperties -> {
-                return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+            /** Stream Consumer */
+            MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+                return KafkaStreamsConsumerService(
+                        messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+                return KafkaStreamsConsumerService(
+                        messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+                return KafkaStreamsConsumerService(
+                        messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
+                )
             }
             else -> {
-                throw BluePrintProcessorException("couldn't get Message client service for")
+                throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
             }
         }
     }
-
-    private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
-        return bluePrintPropertiesService.propertyBeanType(
-            prefix, KafkaBasicAuthMessageConsumerProperties::class.java
-        )
-    }
-
-    private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
-        return bluePrintPropertiesService.propertyBeanType(
-            prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
-        )
-    }
 }
@@ -21,24 +21,20 @@ import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import java.nio.charset.Charset
 import java.time.Duration
 import kotlin.concurrent.thread
 
-open class KafkaBasicAuthMessageConsumerService(
+open class KafkaMessageConsumerService(
     private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
 ) :
     BlueprintMessageConsumerService {
 
-    val log = logger(KafkaBasicAuthMessageConsumerService::class)
+    val log = logger(KafkaMessageConsumerService::class)
     val channel = Channel<String>()
     var kafkaConsumer: Consumer<String, ByteArray>? = null
 
@@ -46,24 +42,7 @@ open class KafkaBasicAuthMessageConsumerService(
     var keepGoing = true
 
     fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
-        val configProperties = hashMapOf<String, Any>()
-        configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
-        configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
-        configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
-        /**
-         * earliest: automatically reset the offset to the earliest offset
-         * latest: automatically reset the offset to the latest offset
-         */
-        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
-        configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
-        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
-        configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId
-
-        /** To handle Back pressure, Get only configured record for processing */
-        if (messageConsumerProperties.pollRecords > 0) {
-            configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
-        }
-        // TODO("Security Implementation based on type")
+        val configProperties = messageConsumerProperties.getConfig()
         /** add or override already set properties */
         additionalConfig?.let { configProperties.putAll(it) }
         /** Create Kafka consumer */
@@ -20,28 +20,20 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 import org.apache.commons.lang.builder.ToStringBuilder
 import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.kafka.common.serialization.StringSerializer
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.slf4j.LoggerFactory
 import java.nio.charset.Charset
 
-class KafkaBasicAuthMessageProducerService(
-    private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties
+class KafkaMessageProducerService(
+    private val messageProducerProperties: MessageProducerProperties
 ) :
     BlueprintMessageProducerService {
 
-    private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
+    private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!!
 
     private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
 
@@ -81,26 +73,16 @@ class KafkaBasicAuthMessageProducerService(
     }
 
     fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
-        log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
-        val configProps = hashMapOf<String, Any>()
-        configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
-        configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
-        configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
-        configProps[ACKS_CONFIG] = messageProducerProperties.acks
-        configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence
-        if (messageProducerProperties.clientId != null) {
-            configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
-        }
-        // TODO("Security Implementation based on type")
+        log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+        val configProps = messageProducerProperties.getConfig()
 
-        // Add additional Properties
-        if (additionalConfig != null) {
+        /** Add additional Properties */
+        if (additionalConfig != null)
             configProps.putAll(additionalConfig)
-        }
 
-        if (kafkaProducer == null) {
+        if (kafkaProducer == null)
             kafkaProducer = KafkaProducer(configProps)
-        }
+
         return kafkaProducer!!
     }
 }
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
-import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.streams.KafkaStreams
-import org.apache.kafka.streams.StreamsConfig
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import java.util.Properties
 
-open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) :
+open class KafkaStreamsConsumerService(private val messageConsumerProperties: MessageConsumerProperties) :
     BlueprintMessageConsumerService {
 
-    val log = logger(KafkaStreamsBasicAuthConsumerService::class)
+    val log = logger(KafkaStreamsConsumerService::class)
     lateinit var kafkaStreams: KafkaStreams
 
     private fun streamsConfig(additionalConfig: Map<String, Any>? = null): Properties {
         val configProperties = Properties()
-        configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId
-        configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
-        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
-        configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee
-        // TODO("Security Implementation based on type")
+        /** set consumer properties */
+        messageConsumerProperties.getConfig().let { configProperties.putAll(it) }
         /** add or override already set properties */
         additionalConfig?.let { configProperties.putAll(it) }
         /** Create Kafka consumer */
index b10e102..612a57d 100644 (file)
@@ -27,17 +27,27 @@ import kotlin.test.assertNotNull
 class MessagePropertiesDSLTest {
 
     @Test
-    fun testMessageProducerDSL() {
+    fun testScramSslMessageProducerDSL() {
         val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
             topologyTemplate {
-                relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") {
-                    kafkaBasicAuth {
+                relationshipTemplateMessageProducer("sample-scram-ssl-auth", "Message Producer") {
+                    kafkaScramSslAuth {
                         bootstrapServers("sample-bootstrapServers")
                         clientId("sample-client-id")
                         acks("all")
                         retries(3)
                         enableIdempotence(true)
                         topic("sample-topic")
+                        truststore("/path/to/truststore.jks")
+                        truststorePassword("secretpassword")
+                        truststoreType("JKS")
+                        keystore("/path/to/keystore.jks")
+                        keystorePassword("secretpassword")
+                        keystoreType("JKS")
+                        sslEndpointIdentificationAlgorithm("")
+                        saslMechanism("SCRAM-SHA-512")
+                        scramUsername("sample-user")
+                        scramPassword("secretpassword")
                     }
                 }
             }
@@ -50,27 +60,27 @@ class MessagePropertiesDSLTest {
         val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
         assertNotNull(relationshipTemplates, "failed to get relationship templates")
         assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match")
-        assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
+        assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
 
         val relationshipTypes = serviceTemplate.relationshipTypes
         assertNotNull(relationshipTypes, "failed to get relationship types")
         assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match")
         assertNotNull(
-            relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
-            "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
+                relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
+                "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
         )
         assertNotNull(
-            relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
-            "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
+                relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
+                "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
         )
     }
 
     @Test
-    fun testMessageConsumerDSL() {
+    fun testScramSslAuthMessageConsumerDSL() {
         val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
             topologyTemplate {
-                relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") {
-                    kafkaBasicAuth {
+                relationshipTemplateMessageConsumer("sample-scram-ssl-auth", "Message Consumer") {
+                    kafkaScramSslAuth {
                         bootstrapServers("sample-bootstrapServers")
                         clientId("sample-client-id")
                         groupId("sample-group-id")
@@ -79,15 +89,35 @@ class MessagePropertiesDSLTest {
                         autoOffsetReset("latest")
                         pollMillSec(5000)
                         pollRecords(20)
+                        truststore("/path/to/truststore.jks")
+                        truststorePassword("secretpassword")
+                        truststoreType("JKS")
+                        keystore("/path/to/keystore.jks")
+                        keystorePassword("secretpassword")
+                        keystoreType("JKS")
+                        sslEndpointIdentificationAlgorithm("")
+                        saslMechanism("SCRAM-SHA-512")
+                        scramUsername("sample-user")
+                        scramPassword("secretpassword")
                     }
                 }
-                relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") {
-                    kafkaStreamsBasicAuth {
+                relationshipTemplateMessageConsumer("sample-stream-scram-ssl-auth", "Message Consumer") {
+                    kafkaStreamsScramSslAuth {
                         bootstrapServers("sample-bootstrapServers")
                         applicationId("sample-application-id")
                         autoOffsetReset("latest")
                         processingGuarantee(StreamsConfig.EXACTLY_ONCE)
                         topic("sample-streaming-topic")
+                        truststore("/path/to/truststore.jks")
+                        truststorePassword("secretpassword")
+                        truststoreType("JKS")
+                        keystore("/path/to/keystore.jks")
+                        keystorePassword("secretpassword")
+                        keystoreType("JKS")
+                        sslEndpointIdentificationAlgorithm("")
+                        saslMechanism("SCRAM-SHA-512")
+                        scramUsername("sample-user")
+                        scramPassword("secretpassword")
                     }
                 }
             }
@@ -100,8 +130,8 @@ class MessagePropertiesDSLTest {
         val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
         assertNotNull(relationshipTemplates, "failed to get relationship templates")
         assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match")
-        assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
-        assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth")
+        assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
+        assertNotNull(relationshipTemplates["sample-stream-scram-ssl-auth"], "failed to get sample-stream-scram-ssl-auth")
 
         val relationshipTypes = serviceTemplate.relationshipTypes
         assertNotNull(relationshipTypes, "failed to get relationship types")
index 823ba7d..ac08dc7 100644 (file)
@@ -23,24 +23,35 @@ import kotlinx.coroutines.channels.consumeEach
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.ConsumerRecords
 import org.apache.kafka.clients.consumer.MockConsumer
 import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertEquals
 import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
 
@@ -52,18 +63,30 @@ import kotlin.test.assertTrue
 )
 @TestPropertySource(
     properties =
-    ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+    ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
         "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
         "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
         "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
         "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
         "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
         "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+        "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
+        "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
+        "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
+        "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
+        "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
+        "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
 
-        "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+        "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
         "blueprintsprocessor.messageproducer.sample.topic=default-topic",
-        "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+        "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+        "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+        "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+        "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+        "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+        "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+        "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
     ]
 )
 open class BlueprintMessageConsumerServiceTest {
@@ -77,7 +100,7 @@ open class BlueprintMessageConsumerServiceTest {
     fun testKafkaBasicAuthConsumerService() {
         runBlocking {
             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
-                .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+                .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -124,7 +147,7 @@ open class BlueprintMessageConsumerServiceTest {
     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
         runBlocking {
             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
-                .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+                .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -173,12 +196,60 @@ open class BlueprintMessageConsumerServiceTest {
         }
     }
 
+    @Test
+    fun testKafkaScramSslAuthConfig() {
+
+        val expectedConfig = mapOf<String, Any>(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+                ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+                ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+                SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+                SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+                SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+                SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+                SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+                SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+                SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+                SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+                        "username=\"sample-user\" " +
+                        "password=\"secretpassword\";"
+                )
+
+        val messageConsumerProperties = bluePrintMessageLibPropertyService
+                .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
+
+        val configProps = messageConsumerProperties.getConfig()
+
+        assertEquals(messageConsumerProperties.topic,
+                "default-topic",
+                "Topic doesn't match the expected value"
+        )
+        assertEquals(messageConsumerProperties.type,
+                "kafka-scram-ssl-auth",
+                "Authentication type doesn't match the expected value")
+
+        expectedConfig.forEach {
+            assertTrue(configProps.containsKey(it.key),
+                    "Missing expected kafka config key : ${it.key}")
+            assertEquals(configProps[it.key],
+                    it.value,
+                    "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+            )
+        }
+    }
+
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
     // @Test
     fun testKafkaIntegration() {
         runBlocking {
             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
-                .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+                .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
             val channel = blueprintMessageConsumerService.subscribe(null)
@@ -190,7 +261,7 @@ open class BlueprintMessageConsumerServiceTest {
 
             /** Send message with every 1 sec */
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
-                .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+                .blueprintMessageProducerService("sample") as KafkaMessageProducerService
             launch {
                 repeat(5) {
                     delay(100)
index b824189..72a47ed 100644 (file)
@@ -20,12 +20,22 @@ import io.mockk.every
 import io.mockk.mockk
 import io.mockk.spyk
 import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
 import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
@@ -33,6 +43,7 @@ import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
 import java.util.concurrent.Future
 import kotlin.test.Test
+import kotlin.test.assertEquals
 import kotlin.test.assertTrue
 
 @RunWith(SpringRunner::class)
@@ -43,10 +54,16 @@ import kotlin.test.assertTrue
 )
 @TestPropertySource(
     properties =
-    ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+    ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
         "blueprintsprocessor.messageproducer.sample.topic=default-topic",
-        "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+        "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+        "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+        "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+        "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+        "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+        "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+        "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
     ]
 )
 open class BlueprintMessageProducerServiceTest {
@@ -55,10 +72,10 @@ open class BlueprintMessageProducerServiceTest {
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
     @Test
-    fun testKafkaBasicAuthProducertService() {
+    fun testKafkaScramSslAuthProducerService() {
         runBlocking {
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
-                .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+                .blueprintMessageProducerService("sample") as KafkaMessageProducerService
 
             val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
 
@@ -75,4 +92,51 @@ open class BlueprintMessageProducerServiceTest {
             assertTrue(response, "failed to get command response")
         }
     }
+
+    @Test
+    fun testKafkaScramSslAuthConfig() {
+        val expectedConfig = mapOf<String, Any>(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+                ProducerConfig.ACKS_CONFIG to "all",
+                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+                ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+                SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+                SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+                SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+                SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+                SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+                SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+                SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+                SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+                        "username=\"sample-user\" " +
+                        "password=\"secretpassword\";"
+        )
+
+        val messageProducerProperties = bluePrintMessageLibPropertyService
+                .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
+
+        val configProps = messageProducerProperties.getConfig()
+
+        assertEquals(messageProducerProperties.topic,
+                "default-topic",
+                "Topic doesn't match the expected value"
+        )
+        assertEquals(messageProducerProperties.type,
+                "kafka-scram-ssl-auth",
+                "Authentication type doesn't match the expected value")
+
+        expectedConfig.forEach {
+            assertTrue(configProps.containsKey(it.key),
+                    "Missing expected kafka config key : ${it.key}"
+            )
+            assertEquals(configProps[it.key],
+                    it.value,
+                    "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+            )
+        }
+    }
 }
@@ -47,19 +47,27 @@ import kotlin.test.assertNotNull
 @TestPropertySource(
     properties =
     [
-        "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+        "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
         "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
         "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+        "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+        "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+        "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+        "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
 
-        "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+        "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth",
         "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
         "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
-        "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+        "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic",
+        "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks",
+        "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword",
+        "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user",
+        "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword"
 
     ]
 )
-class KafkaStreamsBasicAuthConsumerServiceTest {
+class KafkaStreamsConsumerServiceTest {
 
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -117,7 +125,7 @@ class KafkaStreamsBasicAuthConsumerServiceTest {
 
             /** Send message with every 1 sec */
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
-                .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+                .blueprintMessageProducerService("sample") as KafkaMessageProducerService
             launch {
                 repeat(5) {
                     delay(1000)
index fb2189f..77b61a4 100644 (file)
@@ -39,20 +39,31 @@ blueprints.processor.functions.python.executor.modulePaths=./../../../../compone
 
 # Kafka-message-lib Configurations
 blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
-blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-scram-ssl-auth
 blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
 blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
 blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
 blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
 blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+### Security settings
+### SSL
+blueprintsprocessor.messageconsumer.self-service-api.truststore=src/test/resources/test.truststore.jks
+blueprintsprocessor.messageconsumer.self-service-api.truststorePassword=secretpassword
+blueprintsprocessor.messageconsumer.self-service-api.keystore=src/test/resources/test.keystore.jks
+blueprintsprocessor.messageconsumer.self-service-api.keystorePassword=secretpassword
+### SCRAM
+blueprintsprocessor.messageconsumer.self-service-api.scramUsername=test-user
+blueprintsprocessor.messageconsumer.self-service-api.scramPassword=testUserPassword
 
 # Kafka audit service Configurations
+## Audit request
 blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
 blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
 blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
 blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
 blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
 
+## Audit response
 blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
 blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
 blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jks b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jks
new file mode 100644 (file)
index 0000000..1a41509
Binary files /dev/null and b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jks differ
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jks b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jks
new file mode 100644 (file)
index 0000000..b094a1f
Binary files /dev/null and b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jks differ