import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
val messageClientProperties = messageProducerProperties(jsonNode)
- return blueprintMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties)
}
fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
val messageClientProperties = messageProducerProperties(prefix)
- return blueprintMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties)
}
fun messageProducerProperties(prefix: String): MessageProducerProperties {
val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageProducerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaBasicAuthMessageProducerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaSslAuthMessageProducerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramSslAuthMessageProducerProperties::class.java
+ )
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
}
- else -> {
- throw BluePrintProcessorException("Message adaptor($type) is not supported")
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
}
- }
- }
-
- private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties):
- BlueprintMessageProducerService {
-
- when (MessageProducerProperties) {
- is KafkaBasicAuthMessageProducerProperties -> {
- return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
}
else -> {
- throw BluePrintProcessorException("couldn't get Message client service for")
+ throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
}
}
- private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageProducerProperties::class.java
- )
- }
-
/** Consumer Property Lib Service Implementation **/
/** Return Message Consumer Service for [jsonNode] definitions. */
fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
+ /** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageConsumerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaSslAuthMessageConsumerProperties::class.java
+ )
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
+ )
+ }
+ /** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
- kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsSslAuthConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
+ )
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
val type = jsonNode.get("type").textValue()
return when (type) {
+ /** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
+ }
+ /** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
BlueprintMessageConsumerService {
- when (messageConsumerProperties) {
- is KafkaBasicAuthMessageConsumerProperties -> {
- return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+ when (messageConsumerProperties.type) {
+ /** Message Consumer */
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+ )
}
- is KafkaStreamsBasicAuthConsumerProperties -> {
- return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+ /** Stream Consumer */
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
+ )
}
else -> {
- throw BluePrintProcessorException("couldn't get Message client service for")
+ throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
}
}
}
-
- private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageConsumerProperties::class.java
- )
- }
-
- private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
- )
- }
}