Removed redundant timeout handling for executeCommand
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BluePrintMessageLibPropertyService.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import com.fasterxml.jackson.databind.JsonNode
21 import io.micrometer.core.instrument.MeterRegistry
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
34 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
35 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
36 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
37 import org.springframework.stereotype.Service
38
39 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
40 open class BluePrintMessageLibPropertyService(
41     private var bluePrintPropertiesService: BluePrintPropertiesService,
42     private val meterRegistry: MeterRegistry
43 ) {
44
45     fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
46         val messageClientProperties = messageProducerProperties(jsonNode)
47         return KafkaMessageProducerService(messageClientProperties, meterRegistry)
48     }
49
50     fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
51         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
52         val messageClientProperties = messageProducerProperties(prefix)
53         return KafkaMessageProducerService(messageClientProperties, meterRegistry)
54     }
55
56     fun messageProducerProperties(prefix: String): MessageProducerProperties {
57         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
58         return when (type) {
59             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
60                 bluePrintPropertiesService.propertyBeanType(
61                     prefix, KafkaBasicAuthMessageProducerProperties::class.java
62                 )
63             }
64             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
65                 bluePrintPropertiesService.propertyBeanType(
66                     prefix, KafkaSslAuthMessageProducerProperties::class.java
67                 )
68             }
69             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
70                 bluePrintPropertiesService.propertyBeanType(
71                     prefix, KafkaScramSslAuthMessageProducerProperties::class.java
72                 )
73             }
74             else -> {
75                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
76             }
77         }
78     }
79
80     fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
81         val type = jsonNode.get("type").textValue()
82         return when (type) {
83             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
84                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
85             }
86             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
87                 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
88             }
89             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
90                 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
91             }
92             else -> {
93                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
94             }
95         }
96     }
97
98     /** Consumer Property Lib Service Implementation **/
99
100     /** Return Message Consumer Service for [jsonNode] definitions. */
101     fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
102         val messageConsumerProperties = messageConsumerProperties(jsonNode)
103         return blueprintMessageConsumerService(messageConsumerProperties)
104     }
105
106     /** Return Message Consumer Service for [selector] definitions. */
107     fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
108         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
109         val messageClientProperties = messageConsumerProperties(prefix)
110         return blueprintMessageConsumerService(messageClientProperties)
111     }
112
113     /** Return Message Consumer Properties for [prefix] definitions. */
114     fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
115         val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
116         return when (type) {
117             /** Message Consumer */
118             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
119                 bluePrintPropertiesService.propertyBeanType(
120                     prefix, KafkaBasicAuthMessageConsumerProperties::class.java
121                 )
122             }
123             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
124                 bluePrintPropertiesService.propertyBeanType(
125                     prefix, KafkaSslAuthMessageConsumerProperties::class.java
126                 )
127             }
128             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
129                 bluePrintPropertiesService.propertyBeanType(
130                     prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
131                 )
132             }
133             /** Stream Consumer */
134             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
135                 bluePrintPropertiesService.propertyBeanType(
136                     prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
137                 )
138             }
139             MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
140                 bluePrintPropertiesService.propertyBeanType(
141                     prefix, KafkaStreamsSslAuthConsumerProperties::class.java
142                 )
143             }
144             MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
145                 bluePrintPropertiesService.propertyBeanType(
146                     prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
147                 )
148             }
149             else -> {
150                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
151             }
152         }
153     }
154
155     fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
156         val type = jsonNode.get("type").textValue()
157         return when (type) {
158             /** Message Consumer */
159             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
160                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
161             }
162             MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
163                 JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
164             }
165             MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
166                 JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
167             }
168             /** Stream Consumer */
169             MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
170                 JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
171             }
172             MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
173                 JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
174             }
175             MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
176                 JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
177             }
178             else -> {
179                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
180             }
181         }
182     }
183
184     private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
185         BlueprintMessageConsumerService {
186
187             when (messageConsumerProperties.type) {
188                 /** Message Consumer */
189                 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
190                     return KafkaMessageConsumerService(
191                         messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties,
192                         meterRegistry
193                     )
194                 }
195                 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
196                     return KafkaMessageConsumerService(
197                         messageConsumerProperties as KafkaSslAuthMessageConsumerProperties,
198                         meterRegistry
199                     )
200                 }
201                 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
202                     return KafkaMessageConsumerService(
203                         messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties,
204                         meterRegistry
205                     )
206                 }
207                 /** Stream Consumer */
208                 MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
209                     return KafkaStreamsConsumerService(
210                         messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
211                     )
212                 }
213                 MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
214                     return KafkaStreamsConsumerService(
215                         messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
216                     )
217                 }
218                 MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
219                     return KafkaStreamsConsumerService(
220                         messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
221                     )
222                 }
223                 else -> {
224                     throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
225                 }
226             }
227         }
228 }