2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import com.fasterxml.jackson.databind.JsonNode
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
34 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
35 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
36 import org.springframework.stereotype.Service
38 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
39 open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) {
41 fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
42 val messageClientProperties = messageProducerProperties(jsonNode)
43 return KafkaMessageProducerService(messageClientProperties)
46 fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
47 val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
48 val messageClientProperties = messageProducerProperties(prefix)
49 return KafkaMessageProducerService(messageClientProperties)
52 fun messageProducerProperties(prefix: String): MessageProducerProperties {
53 val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
55 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
56 bluePrintPropertiesService.propertyBeanType(
57 prefix, KafkaBasicAuthMessageProducerProperties::class.java
60 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
61 bluePrintPropertiesService.propertyBeanType(
62 prefix, KafkaSslAuthMessageProducerProperties::class.java
65 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
66 bluePrintPropertiesService.propertyBeanType(
67 prefix, KafkaScramSslAuthMessageProducerProperties::class.java
71 throw BluePrintProcessorException("Message adaptor($type) is not supported")
76 fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
77 val type = jsonNode.get("type").textValue()
79 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
80 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
82 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
83 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
85 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
86 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
89 throw BluePrintProcessorException("Message adaptor($type) is not supported")
94 /** Consumer Property Lib Service Implementation **/
96 /** Return Message Consumer Service for [jsonNode] definitions. */
97 fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
98 val messageConsumerProperties = messageConsumerProperties(jsonNode)
99 return blueprintMessageConsumerService(messageConsumerProperties)
102 /** Return Message Consumer Service for [selector] definitions. */
103 fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
104 val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
105 val messageClientProperties = messageConsumerProperties(prefix)
106 return blueprintMessageConsumerService(messageClientProperties)
109 /** Return Message Consumer Properties for [prefix] definitions. */
110 fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
111 val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
113 /** Message Consumer */
114 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
115 bluePrintPropertiesService.propertyBeanType(
116 prefix, KafkaBasicAuthMessageConsumerProperties::class.java
119 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
120 bluePrintPropertiesService.propertyBeanType(
121 prefix, KafkaSslAuthMessageConsumerProperties::class.java
124 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
125 bluePrintPropertiesService.propertyBeanType(
126 prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
129 /** Stream Consumer */
130 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
131 bluePrintPropertiesService.propertyBeanType(
132 prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
135 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
136 bluePrintPropertiesService.propertyBeanType(
137 prefix, KafkaStreamsSslAuthConsumerProperties::class.java
140 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
141 bluePrintPropertiesService.propertyBeanType(
142 prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
146 throw BluePrintProcessorException("Message adaptor($type) is not supported")
151 fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
152 val type = jsonNode.get("type").textValue()
154 /** Message Consumer */
155 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
156 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
158 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
159 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
161 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
162 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
164 /** Stream Consumer */
165 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
166 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
168 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
169 JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
171 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
172 JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
175 throw BluePrintProcessorException("Message adaptor($type) is not supported")
180 private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
181 BlueprintMessageConsumerService {
183 when (messageConsumerProperties.type) {
184 /** Message Consumer */
185 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
186 return KafkaMessageConsumerService(
187 messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
190 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
191 return KafkaMessageConsumerService(
192 messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
195 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
196 return KafkaMessageConsumerService(
197 messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
200 /** Stream Consumer */
201 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
202 return KafkaStreamsConsumerService(
203 messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
206 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
207 return KafkaStreamsConsumerService(
208 messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
211 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
212 return KafkaStreamsConsumerService(
213 messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
217 throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")