e0e073d49de9ae96532581a94b368873b58edddf
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *  Modification Copyright (C) 2022 Nordix Foundation.
5  *
6  *  Licensed under the Apache License, Version 2.0 (the "License");
7  *  you may not use this file except in compliance with the License.
8  *  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */
18
19 package org.onap.ccsdk.cds.blueprintsprocessor.message
20
21 import org.apache.kafka.clients.CommonClientConfigs
22 import org.apache.kafka.clients.consumer.ConsumerConfig
23 import org.apache.kafka.clients.producer.ProducerConfig
24 import org.apache.kafka.common.config.SaslConfigs
25 import org.apache.kafka.common.config.SslConfigs
26 import org.apache.kafka.common.security.auth.SecurityProtocol
27 import org.apache.kafka.common.security.scram.ScramLoginModule
28 import org.apache.kafka.common.serialization.ByteArrayDeserializer
29 import org.apache.kafka.common.serialization.ByteArraySerializer
30 import org.apache.kafka.common.serialization.StringDeserializer
31 import org.apache.kafka.common.serialization.StringSerializer
32 import org.apache.kafka.streams.StreamsConfig
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
34
35 /** Common Properties **/
36 abstract class CommonProperties {
37
38     lateinit var type: String
39     lateinit var topic: String
40     lateinit var bootstrapServers: String
41
42     open fun getConfig(): HashMap<String, Any> {
43         val configProps = hashMapOf<String, Any>()
44         configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
45         return configProps
46     }
47 }
48
49 /** Message Producer */
50 /** Message Producer Properties **/
51 abstract class MessageProducerProperties : CommonProperties()
52
53 /** Basic Auth */
54 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
55
56     lateinit var clientId: String
57     var acks: String = "all" // strongest producing guarantee
58     var maxBlockMs: Int = 250 // max blocking time in ms to send a message
59     var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
60     var enableIdempotence: Boolean = true // ensure we don't push duplicates
61
62     override fun getConfig(): HashMap<String, Any> {
63         val configProps = super.getConfig()
64         configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
65         configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
66         configProps[ProducerConfig.ACKS_CONFIG] = acks
67         configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
68         configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
69         configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
70         configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
71         return configProps
72     }
73 }
74
75 /** SSL Auth */
76 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
77
78     lateinit var truststore: String
79     lateinit var truststorePassword: String
80     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
81     var keystore: String? = null
82     var keystorePassword: String? = null
83     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
84     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
85
86     override fun getConfig(): HashMap<String, Any> {
87         val configProps = super.getConfig()
88         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
89         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
90         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
91         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
92         if (keystore != null) {
93             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
94             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
95             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
96         }
97         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
98
99         return configProps
100     }
101 }
102
103 /** (SASL) SCRAM SSL Auth */
104 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
105
106     var saslMechanism: String = "SCRAM-SHA-512"
107     lateinit var scramUsername: String
108     lateinit var scramPassword: String
109
110     override fun getConfig(): HashMap<String, Any> {
111         val configProps = super.getConfig()
112         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
113         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
114         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
115             "username=\"${scramUsername}\" " +
116             "password=\"${scramPassword}\";"
117         return configProps
118     }
119 }
120
121 /** (SASL) SCRAM Plaintext Auth */
122 class KafkaScramPlainTextAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
123
124     var saslMechanism: String = "SCRAM-SHA-512"
125     lateinit var scramUsername: String
126     lateinit var scramPassword: String
127
128     override fun getConfig(): HashMap<String, Any> {
129         val configProps = super.getConfig()
130         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
131         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
132         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
133             "username=\"${scramUsername}\" " +
134             "password=\"${scramPassword}\";"
135         return configProps
136     }
137 }
138
139 /** Consumer */
140 abstract class MessageConsumerProperties : CommonProperties()
141 /** Kafka Streams */
142 /** Streams properties */
143
144 /** Basic Auth */
145 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
146
147     lateinit var applicationId: String
148     var autoOffsetReset: String = "latest"
149     var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
150
151     override fun getConfig(): HashMap<String, Any> {
152         val configProperties = super.getConfig()
153         // adjust the worker name with the hostname suffix because we'll have several workers, running in
154         // different pods, using the same worker name otherwise.
155         configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}"
156         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
157         configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
158         return configProperties
159     }
160 }
161
162 /** SSL Auth */
163 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
164
165     lateinit var truststore: String
166     lateinit var truststorePassword: String
167     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
168     var keystore: String? = null
169     var keystorePassword: String? = null
170     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
171     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
172
173     override fun getConfig(): HashMap<String, Any> {
174         val configProps = super.getConfig()
175         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
176         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
177         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
178         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
179         if (keystore != null) {
180             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
181             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
182             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
183         }
184         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
185         return configProps
186     }
187 }
188
189 /** (SASL) SCRAM SSL Auth */
190 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
191
192     var saslMechanism: String = "SCRAM-SHA-512"
193     lateinit var scramUsername: String
194     lateinit var scramPassword: String
195
196     override fun getConfig(): HashMap<String, Any> {
197         val configProps = super.getConfig()
198         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
199         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
200         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
201             "username=\"${scramUsername}\" " +
202             "password=\"${scramPassword}\";"
203         return configProps
204     }
205 }
206
207 /** Message Consumer */
208 /** Message Consumer Properties **/
209 /** Basic Auth */
210 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
211
212     lateinit var groupId: String
213     lateinit var clientId: String
214     var autoCommit: Boolean = true
215     var autoOffsetReset: String = "latest"
216     var pollMillSec: Long = 1000
217     var pollRecords: Int = -1
218
219     override fun getConfig(): HashMap<String, Any> {
220         val configProperties = super.getConfig()
221         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
222         configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
223         /**
224          * earliest: automatically reset the offset to the earliest offset
225          * latest: automatically reset the offset to the latest offset
226          */
227         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
228         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
229         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
230         // adjust the worker name with the hostname suffix because we'll have several workers, running in
231         // different pods, using the same worker name otherwise.
232         configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
233
234         /** To handle Back pressure, Get only configured record for processing */
235         if (pollRecords > 0) {
236             configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
237         }
238
239         return configProperties
240     }
241 }
242
243 /** SSL Auth */
244 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
245
246     lateinit var truststore: String
247     lateinit var truststorePassword: String
248     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
249     var keystore: String? = null
250     var keystorePassword: String? = null
251     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
252     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
253
254     override fun getConfig(): HashMap<String, Any> {
255         val configProps = super.getConfig()
256         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
257         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
258         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
259         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
260         if (keystore != null) {
261             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
262             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
263             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
264         }
265         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
266         return configProps
267     }
268 }
269
270 /** (SASL) SCRAM SSL Auth */
271 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
272
273     var saslMechanism: String = "SCRAM-SHA-512"
274     lateinit var scramUsername: String
275     lateinit var scramPassword: String
276
277     override fun getConfig(): HashMap<String, Any> {
278         val configProps = super.getConfig()
279         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
280         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
281         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
282             "username=\"${scramUsername}\" " +
283             "password=\"${scramPassword}\";"
284         return configProps
285     }
286 }
287
288 /** (SASL) SCRAM Plaintext Auth */
289 class KafkaScramPlaintextAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
290
291     var saslMechanism: String = "SCRAM-SHA-512"
292     lateinit var scramUsername: String
293     lateinit var scramPassword: String
294
295     override fun getConfig(): HashMap<String, Any> {
296         val configProps = super.getConfig()
297         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
298         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
299         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
300             "username=\"${scramUsername}\" " +
301             "password=\"${scramPassword}\";"
302         return configProps
303     }
304 }