Secure Kafka Authentication
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BluePrintMessageLibPropertyService.kt
index 44b50af..67fbef5 100644 (file)
@@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.JsonNode
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
@@ -34,20 +40,32 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
 
     fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
         val messageClientProperties = messageProducerProperties(jsonNode)
-        return blueprintMessageProducerService(messageClientProperties)
+        return KafkaMessageProducerService(messageClientProperties)
     }
 
     fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
         val messageClientProperties = messageProducerProperties(prefix)
-        return blueprintMessageProducerService(messageClientProperties)
+        return KafkaMessageProducerService(messageClientProperties)
     }
 
     fun messageProducerProperties(prefix: String): MessageProducerProperties {
         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
         return when (type) {
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
-                kafkaBasicAuthMessageProducerProperties(prefix)
+                bluePrintPropertiesService.propertyBeanType(
+                    prefix, KafkaBasicAuthMessageProducerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaSslAuthMessageProducerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaScramSslAuthMessageProducerProperties::class.java
+                )
             }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -61,31 +79,18 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
             }
-            else -> {
-                throw BluePrintProcessorException("Message adaptor($type) is not supported")
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
             }
-        }
-    }
-
-    private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties):
-            BlueprintMessageProducerService {
-
-        when (MessageProducerProperties) {
-            is KafkaBasicAuthMessageProducerProperties -> {
-                return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
             }
             else -> {
-                throw BluePrintProcessorException("couldn't get Message client service for")
+                throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
         }
     }
 
-    private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
-        return bluePrintPropertiesService.propertyBeanType(
-            prefix, KafkaBasicAuthMessageProducerProperties::class.java
-        )
-    }
-
     /** Consumer Property Lib Service Implementation **/
 
     /** Return Message Consumer Service for [jsonNode] definitions. */
@@ -105,11 +110,37 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
     fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
         return when (type) {
+            /** Message Consumer */
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
-                kafkaBasicAuthMessageConsumerProperties(prefix)
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaSslAuthMessageConsumerProperties::class.java
+                )
             }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
+                )
+            }
+            /** Stream Consumer */
             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
-                kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaStreamsSslAuthConsumerProperties::class.java
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+                bluePrintPropertiesService.propertyBeanType(
+                        prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
+                )
             }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -120,12 +151,26 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
     fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
         val type = jsonNode.get("type").textValue()
         return when (type) {
+            /** Message Consumer */
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
             }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
+            }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
+            }
+            /** Stream Consumer */
             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
             }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
+            }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -135,28 +180,42 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
     private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
             BlueprintMessageConsumerService {
 
-        when (messageConsumerProperties) {
-            is KafkaBasicAuthMessageConsumerProperties -> {
-                return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+        when (messageConsumerProperties.type) {
+            /** Message Consumer */
+            MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+                return KafkaMessageConsumerService(
+                        messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+                return KafkaMessageConsumerService(
+                        messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+                return KafkaMessageConsumerService(
+                        messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+                )
             }
-            is KafkaStreamsBasicAuthConsumerProperties -> {
-                return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+            /** Stream Consumer */
+            MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+                return KafkaStreamsConsumerService(
+                        messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+                return KafkaStreamsConsumerService(
+                        messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
+                )
+            }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+                return KafkaStreamsConsumerService(
+                        messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
+                )
             }
             else -> {
-                throw BluePrintProcessorException("couldn't get Message client service for")
+                throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
             }
         }
     }
-
-    private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
-        return bluePrintPropertiesService.propertyBeanType(
-            prefix, KafkaBasicAuthMessageConsumerProperties::class.java
-        )
-    }
-
-    private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
-        return bluePrintPropertiesService.propertyBeanType(
-            prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
-        )
-    }
 }