Renaming Files having BluePrint to have Blueprint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageLibPropertyService.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import com.fasterxml.jackson.databind.JsonNode
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
34 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
35 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
36 import org.springframework.stereotype.Service
37
38 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
39 open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesService: BlueprintPropertiesService) {
40
41     fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
42         val messageClientProperties = messageProducerProperties(jsonNode)
43         return KafkaMessageProducerService(messageClientProperties)
44     }
45
46     fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
47         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
48         val messageClientProperties = messageProducerProperties(prefix)
49         return KafkaMessageProducerService(messageClientProperties)
50     }
51
52     fun messageProducerProperties(prefix: String): MessageProducerProperties {
53         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
54         return when (type) {
55             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
56                 bluePrintPropertiesService.propertyBeanType(
57                     prefix, KafkaBasicAuthMessageProducerProperties::class.java
58                 )
59             }
60             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
61                 bluePrintPropertiesService.propertyBeanType(
62                     prefix, KafkaSslAuthMessageProducerProperties::class.java
63                 )
64             }
65             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
66                 bluePrintPropertiesService.propertyBeanType(
67                     prefix, KafkaScramSslAuthMessageProducerProperties::class.java
68                 )
69             }
70             else -> {
71                 throw BlueprintProcessorException("Message adaptor($type) is not supported")
72             }
73         }
74     }
75
76     fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
77         val type = jsonNode.get("type").textValue()
78         return when (type) {
79             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
80                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
81             }
82             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
83                 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
84             }
85             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
86                 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
87             }
88             else -> {
89                 throw BlueprintProcessorException("Message adaptor($type) is not supported")
90             }
91         }
92     }
93
94     /** Consumer Property Lib Service Implementation **/
95
96     /** Return Message Consumer Service for [jsonNode] definitions. */
97     fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
98         val messageConsumerProperties = messageConsumerProperties(jsonNode)
99         return blueprintMessageConsumerService(messageConsumerProperties)
100     }
101
102     /** Return Message Consumer Service for [selector] definitions. */
103     fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
104         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
105         val messageClientProperties = messageConsumerProperties(prefix)
106         return blueprintMessageConsumerService(messageClientProperties)
107     }
108
109     /** Return Message Consumer Properties for [prefix] definitions. */
110     fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
111         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
112         return when (type) {
113             /** Message Consumer */
114             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
115                 bluePrintPropertiesService.propertyBeanType(
116                     prefix, KafkaBasicAuthMessageConsumerProperties::class.java
117                 )
118             }
119             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
120                 bluePrintPropertiesService.propertyBeanType(
121                     prefix, KafkaSslAuthMessageConsumerProperties::class.java
122                 )
123             }
124             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
125                 bluePrintPropertiesService.propertyBeanType(
126                     prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
127                 )
128             }
129             /** Stream Consumer */
130             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
131                 bluePrintPropertiesService.propertyBeanType(
132                     prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
133                 )
134             }
135             MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
136                 bluePrintPropertiesService.propertyBeanType(
137                     prefix, KafkaStreamsSslAuthConsumerProperties::class.java
138                 )
139             }
140             MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
141                 bluePrintPropertiesService.propertyBeanType(
142                     prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
143                 )
144             }
145             else -> {
146                 throw BlueprintProcessorException("Message adaptor($type) is not supported")
147             }
148         }
149     }
150
151     fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
152         val type = jsonNode.get("type").textValue()
153         return when (type) {
154             /** Message Consumer */
155             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
156                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
157             }
158             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
159                 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
160             }
161             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
162                 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
163             }
164             /** Stream Consumer */
165             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
166                 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
167             }
168             MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
169                 JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
170             }
171             MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
172                 JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
173             }
174             else -> {
175                 throw BlueprintProcessorException("Message adaptor($type) is not supported")
176             }
177         }
178     }
179
180     private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
181         BlueprintMessageConsumerService {
182
183             when (messageConsumerProperties.type) {
184                 /** Message Consumer */
185                 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
186                     return KafkaMessageConsumerService(
187                         messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
188                     )
189                 }
190                 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
191                     return KafkaMessageConsumerService(
192                         messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
193                     )
194                 }
195                 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
196                     return KafkaMessageConsumerService(
197                         messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
198                     )
199                 }
200                 /** Stream Consumer */
201                 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
202                     return KafkaStreamsConsumerService(
203                         messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
204                     )
205                 }
206                 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
207                     return KafkaStreamsConsumerService(
208                         messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
209                     )
210                 }
211                 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
212                     return KafkaStreamsConsumerService(
213                         messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
214                     )
215                 }
216                 else -> {
217                     throw BlueprintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
218                 }
219             }
220         }
221 }