2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import com.fasterxml.jackson.databind.JsonNode
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.*
23 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
24 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
25 import org.springframework.stereotype.Service
27 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
28 open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) {
30 fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
31 val messageClientProperties = messageProducerProperties(jsonNode)
32 return blueprintMessageProducerService(messageClientProperties)
35 fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
36 val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
37 val messageClientProperties = messageProducerProperties(prefix)
38 return blueprintMessageProducerService(messageClientProperties)
41 fun messageProducerProperties(prefix: String): MessageProducerProperties {
42 val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
44 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
45 kafkaBasicAuthMessageProducerProperties(prefix)
48 throw BluePrintProcessorException("Message adaptor($type) is not supported")
53 fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
54 val type = jsonNode.get("type").textValue()
56 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
57 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
60 throw BluePrintProcessorException("Message adaptor($type) is not supported")
65 private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties)
66 : BlueprintMessageProducerService {
68 when (MessageProducerProperties) {
69 is KafkaBasicAuthMessageProducerProperties -> {
70 return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
73 throw BluePrintProcessorException("couldn't get Message client service for")
78 private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
79 return bluePrintPropertiesService.propertyBeanType(
80 prefix, KafkaBasicAuthMessageProducerProperties::class.java)
83 /** Consumer Property Lib Service Implementation **/
85 /** Return Message Consumer Service for [jsonNode] definitions. */
86 fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
87 val messageConsumerProperties = messageConsumerProperties(jsonNode)
88 return blueprintMessageConsumerService(messageConsumerProperties)
91 /** Return Message Consumer Service for [selector] definitions. */
92 fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
93 val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
94 val messageClientProperties = messageConsumerProperties(prefix)
95 return blueprintMessageConsumerService(messageClientProperties)
98 /** Return Message Consumer Properties for [prefix] definitions. */
99 fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
100 val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
102 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
103 kafkaBasicAuthMessageConsumerProperties(prefix)
105 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
106 kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
109 throw BluePrintProcessorException("Message adaptor($type) is not supported")
114 fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
115 val type = jsonNode.get("type").textValue()
117 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
118 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
120 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
121 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
124 throw BluePrintProcessorException("Message adaptor($type) is not supported")
129 private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties)
130 : BlueprintMessageConsumerService {
132 when (messageConsumerProperties) {
133 is KafkaBasicAuthMessageConsumerProperties -> {
134 return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
136 is KafkaStreamsBasicAuthConsumerProperties -> {
137 return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
140 throw BluePrintProcessorException("couldn't get Message client service for")
145 private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
146 return bluePrintPropertiesService.propertyBeanType(
147 prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
150 private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
151 return bluePrintProperties.propertyBeanType(
152 prefix, KafkaStreamsBasicAuthConsumerProperties::class.java)