/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.JsonNode
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.*
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.stereotype.Service
@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
-open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) {
+open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) {
- fun blueprintMessageClientService(jsonNode: JsonNode): BlueprintMessageProducerService {
- val messageClientProperties = messageClientProperties(jsonNode)
- return blueprintMessageClientService(messageClientProperties)
+ fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
+ val messageClientProperties = messageProducerProperties(jsonNode)
+ return blueprintMessageProducerService(messageClientProperties)
}
- fun blueprintMessageClientService(selector: String): BlueprintMessageProducerService {
- val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CLIENT_PREFIX}$selector"
- val messageClientProperties = messageClientProperties(prefix)
- return blueprintMessageClientService(messageClientProperties)
+ fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
+ val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
+ val messageClientProperties = messageProducerProperties(prefix)
+ return blueprintMessageProducerService(messageClientProperties)
}
- fun messageClientProperties(prefix: String): MessageProducerProperties {
- val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
+ fun messageProducerProperties(prefix: String): MessageProducerProperties {
+ val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageClientProperties(prefix)
+ kafkaBasicAuthMessageProducerProperties(prefix)
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
}
- fun messageClientProperties(jsonNode: JsonNode): MessageProducerProperties {
+ fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
val type = jsonNode.get("type").textValue()
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
}
}
- private fun blueprintMessageClientService(MessageProducerProperties: MessageProducerProperties)
+ private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties)
: BlueprintMessageProducerService {
when (MessageProducerProperties) {
}
}
- private fun kafkaBasicAuthMessageClientProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
- return bluePrintProperties.propertyBeanType(
+ private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
+ return bluePrintPropertiesService.propertyBeanType(
prefix, KafkaBasicAuthMessageProducerProperties::class.java)
}
+ /** Consumer Property Lib Service Implementation **/
+
+ /** Return Message Consumer Service for [jsonNode] definitions. */
+ fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
+ val messageConsumerProperties = messageConsumerProperties(jsonNode)
+ return blueprintMessageConsumerService(messageConsumerProperties)
+ }
+
+ /** Return Message Consumer Service for [selector] definitions. */
+ fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
+ val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
+ val messageClientProperties = messageConsumerProperties(prefix)
+ return blueprintMessageConsumerService(messageClientProperties)
+ }
+
+ /** Return Message Consumer Properties for [prefix] definitions. */
+ fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
+ val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
+ return when (type) {
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ kafkaBasicAuthMessageConsumerProperties(prefix)
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+ }
+ else -> {
+ throw BluePrintProcessorException("Message adaptor($type) is not supported")
+ }
+ }
+ }
+
+ fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
+ val type = jsonNode.get("type").textValue()
+ return when (type) {
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
+ }
+ else -> {
+ throw BluePrintProcessorException("Message adaptor($type) is not supported")
+ }
+ }
+ }
+
+ private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties)
+ : BlueprintMessageConsumerService {
+
+ when (messageConsumerProperties) {
+ is KafkaBasicAuthMessageConsumerProperties -> {
+ return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+ }
+ is KafkaStreamsBasicAuthConsumerProperties -> {
+ return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't get Message client service for")
+ }
+ }
+ }
+
+ private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
+ return bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
+ }
+
+ private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
+ return bluePrintProperties.propertyBeanType(
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java)
+ }
+
}