2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import com.fasterxml.jackson.databind.JsonNode
21 import io.micrometer.core.instrument.MeterRegistry
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
34 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
35 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
36 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
37 import org.springframework.stereotype.Service
39 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
40 open class BlueprintMessageLibPropertyService(
41 private var bluePrintPropertiesService: BlueprintPropertiesService,
42 private val meterRegistry: MeterRegistry
45 fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
46 val messageClientProperties = messageProducerProperties(jsonNode)
47 return KafkaMessageProducerService(messageClientProperties, meterRegistry)
50 fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
51 val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
52 val messageClientProperties = messageProducerProperties(prefix)
53 return KafkaMessageProducerService(messageClientProperties, meterRegistry)
56 fun messageProducerProperties(prefix: String): MessageProducerProperties {
57 val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
59 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
60 bluePrintPropertiesService.propertyBeanType(
61 prefix, KafkaBasicAuthMessageProducerProperties::class.java
64 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
65 bluePrintPropertiesService.propertyBeanType(
66 prefix, KafkaSslAuthMessageProducerProperties::class.java
69 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
70 bluePrintPropertiesService.propertyBeanType(
71 prefix, KafkaScramSslAuthMessageProducerProperties::class.java
75 throw BlueprintProcessorException("Message adaptor($type) is not supported")
80 fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
81 val type = jsonNode.get("type").textValue()
83 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
84 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
86 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
87 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
89 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
90 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
93 throw BlueprintProcessorException("Message adaptor($type) is not supported")
98 /** Consumer Property Lib Service Implementation **/
100 /** Return Message Consumer Service for [jsonNode] definitions. */
101 fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
102 val messageConsumerProperties = messageConsumerProperties(jsonNode)
103 return blueprintMessageConsumerService(messageConsumerProperties)
106 /** Return Message Consumer Service for [selector] definitions. */
107 fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
108 val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
109 val messageClientProperties = messageConsumerProperties(prefix)
110 return blueprintMessageConsumerService(messageClientProperties)
113 /** Return Message Consumer Properties for [prefix] definitions. */
114 fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
115 val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
117 /** Message Consumer */
118 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
119 bluePrintPropertiesService.propertyBeanType(
120 prefix, KafkaBasicAuthMessageConsumerProperties::class.java
123 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
124 bluePrintPropertiesService.propertyBeanType(
125 prefix, KafkaSslAuthMessageConsumerProperties::class.java
128 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
129 bluePrintPropertiesService.propertyBeanType(
130 prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
133 /** Stream Consumer */
134 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
135 bluePrintPropertiesService.propertyBeanType(
136 prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
139 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
140 bluePrintPropertiesService.propertyBeanType(
141 prefix, KafkaStreamsSslAuthConsumerProperties::class.java
144 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
145 bluePrintPropertiesService.propertyBeanType(
146 prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
150 throw BlueprintProcessorException("Message adaptor($type) is not supported")
155 fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
156 val type = jsonNode.get("type").textValue()
158 /** Message Consumer */
159 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
160 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
162 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
163 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
165 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
166 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
168 /** Stream Consumer */
169 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
170 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
172 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
173 JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
175 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
176 JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
179 throw BlueprintProcessorException("Message adaptor($type) is not supported")
184 private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
185 BlueprintMessageConsumerService {
187 when (messageConsumerProperties.type) {
188 /** Message Consumer */
189 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
190 return KafkaMessageConsumerService(
191 messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties,
195 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
196 return KafkaMessageConsumerService(
197 messageConsumerProperties as KafkaSslAuthMessageConsumerProperties,
201 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
202 return KafkaMessageConsumerService(
203 messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties,
207 /** Stream Consumer */
208 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
209 return KafkaStreamsConsumerService(
210 messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
213 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
214 return KafkaStreamsConsumerService(
215 messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
218 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
219 return KafkaStreamsConsumerService(
220 messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
224 throw BlueprintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")