Merge "Add Kafka Streams consumer service"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BluePrintMessageLibPropertyService.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import com.fasterxml.jackson.databind.JsonNode
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.*
23 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
24 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
25 import org.springframework.stereotype.Service
26
27 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
28 open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) {
29
30     fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
31         val messageClientProperties = messageProducerProperties(jsonNode)
32         return blueprintMessageProducerService(messageClientProperties)
33     }
34
35     fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
36         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
37         val messageClientProperties = messageProducerProperties(prefix)
38         return blueprintMessageProducerService(messageClientProperties)
39     }
40
41     fun messageProducerProperties(prefix: String): MessageProducerProperties {
42         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
43         return when (type) {
44             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
45                 kafkaBasicAuthMessageProducerProperties(prefix)
46             }
47             else -> {
48                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
49             }
50         }
51     }
52
53     fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
54         val type = jsonNode.get("type").textValue()
55         return when (type) {
56             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
57                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
58             }
59             else -> {
60                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
61             }
62         }
63     }
64
65     private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties)
66             : BlueprintMessageProducerService {
67
68         when (MessageProducerProperties) {
69             is KafkaBasicAuthMessageProducerProperties -> {
70                 return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
71             }
72             else -> {
73                 throw BluePrintProcessorException("couldn't get Message client service for")
74             }
75         }
76     }
77
78     private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
79         return bluePrintPropertiesService.propertyBeanType(
80                 prefix, KafkaBasicAuthMessageProducerProperties::class.java)
81     }
82
83     /** Consumer Property Lib Service Implementation **/
84
85     /** Return Message Consumer Service for [jsonNode] definitions. */
86     fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
87         val messageConsumerProperties = messageConsumerProperties(jsonNode)
88         return blueprintMessageConsumerService(messageConsumerProperties)
89     }
90
91     /** Return Message Consumer Service for [selector] definitions. */
92     fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
93         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
94         val messageClientProperties = messageConsumerProperties(prefix)
95         return blueprintMessageConsumerService(messageClientProperties)
96     }
97
98     /** Return Message Consumer Properties for [prefix] definitions. */
99     fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
100         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
101         return when (type) {
102             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
103                 kafkaBasicAuthMessageConsumerProperties(prefix)
104             }
105             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
106                 kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
107             }
108             else -> {
109                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
110             }
111         }
112     }
113
114     fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
115         val type = jsonNode.get("type").textValue()
116         return when (type) {
117             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
118                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
119             }
120             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
121                 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
122             }
123             else -> {
124                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
125             }
126         }
127     }
128
129     private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties)
130             : BlueprintMessageConsumerService {
131
132         when (messageConsumerProperties) {
133             is KafkaBasicAuthMessageConsumerProperties -> {
134                 return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
135             }
136             is KafkaStreamsBasicAuthConsumerProperties -> {
137                 return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
138             }
139             else -> {
140                 throw BluePrintProcessorException("couldn't get Message client service for")
141             }
142         }
143     }
144
145     private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
146         return bluePrintPropertiesService.propertyBeanType(
147                 prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
148     }
149
150     private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
151         return bluePrintProperties.propertyBeanType(
152                 prefix, KafkaStreamsBasicAuthConsumerProperties::class.java)
153     }
154
155 }