2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4 * Modification Copyright (C) 2022 Nordix Foundation.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.onap.ccsdk.cds.blueprintsprocessor.message
21 import org.apache.kafka.clients.CommonClientConfigs
22 import org.apache.kafka.clients.consumer.ConsumerConfig
23 import org.apache.kafka.clients.producer.ProducerConfig
24 import org.apache.kafka.common.config.SaslConfigs
25 import org.apache.kafka.common.config.SslConfigs
26 import org.apache.kafka.common.security.auth.SecurityProtocol
27 import org.apache.kafka.common.security.scram.ScramLoginModule
28 import org.apache.kafka.common.serialization.ByteArrayDeserializer
29 import org.apache.kafka.common.serialization.ByteArraySerializer
30 import org.apache.kafka.common.serialization.StringDeserializer
31 import org.apache.kafka.common.serialization.StringSerializer
32 import org.apache.kafka.streams.StreamsConfig
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
35 /** Common Properties **/
36 abstract class CommonProperties {
38 lateinit var type: String
39 lateinit var topic: String
40 lateinit var bootstrapServers: String
42 open fun getConfig(): HashMap<String, Any> {
43 val configProps = hashMapOf<String, Any>()
44 configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
49 /** Message Producer */
50 /** Message Producer Properties **/
51 abstract class MessageProducerProperties : CommonProperties()
54 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
56 lateinit var clientId: String
57 var acks: String = "all" // strongest producing guarantee
58 var maxBlockMs: Int = 250 // max blocking time in ms to send a message
59 var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
60 var enableIdempotence: Boolean = true // ensure we don't push duplicates
62 override fun getConfig(): HashMap<String, Any> {
63 val configProps = super.getConfig()
64 configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
65 configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
66 configProps[ProducerConfig.ACKS_CONFIG] = acks
67 configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
68 configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
69 configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
70 configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
76 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
78 lateinit var truststore: String
79 lateinit var truststorePassword: String
80 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
81 var keystore: String? = null
82 var keystorePassword: String? = null
83 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
84 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
86 override fun getConfig(): HashMap<String, Any> {
87 val configProps = super.getConfig()
88 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
89 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
90 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
91 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
92 if (keystore != null) {
93 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
94 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
95 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
97 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
103 /** (SASL) SCRAM SSL Auth */
104 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
106 var saslMechanism: String = "SCRAM-SHA-512"
107 lateinit var scramUsername: String
108 lateinit var scramPassword: String
110 override fun getConfig(): HashMap<String, Any> {
111 val configProps = super.getConfig()
112 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
113 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
114 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
115 "username=\"${scramUsername}\" " +
116 "password=\"${scramPassword}\";"
121 /** (SASL) SCRAM Plaintext Auth */
122 class KafkaScramPlainTextAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
124 var saslMechanism: String = "SCRAM-SHA-512"
125 lateinit var scramUsername: String
126 lateinit var scramPassword: String
128 override fun getConfig(): HashMap<String, Any> {
129 val configProps = super.getConfig()
130 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
131 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
132 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
133 "username=\"${scramUsername}\" " +
134 "password=\"${scramPassword}\";"
140 abstract class MessageConsumerProperties : CommonProperties()
142 /** Streams properties */
145 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
147 lateinit var applicationId: String
148 var autoOffsetReset: String = "latest"
149 var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
151 override fun getConfig(): HashMap<String, Any> {
152 val configProperties = super.getConfig()
153 // adjust the worker name with the hostname suffix because we'll have several workers, running in
154 // different pods, using the same worker name otherwise.
155 configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}"
156 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
157 configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
158 return configProperties
163 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
165 lateinit var truststore: String
166 lateinit var truststorePassword: String
167 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
168 var keystore: String? = null
169 var keystorePassword: String? = null
170 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
171 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
173 override fun getConfig(): HashMap<String, Any> {
174 val configProps = super.getConfig()
175 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
176 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
177 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
178 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
179 if (keystore != null) {
180 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
181 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
182 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
184 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
189 /** (SASL) SCRAM SSL Auth */
190 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
192 var saslMechanism: String = "SCRAM-SHA-512"
193 lateinit var scramUsername: String
194 lateinit var scramPassword: String
196 override fun getConfig(): HashMap<String, Any> {
197 val configProps = super.getConfig()
198 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
199 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
200 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
201 "username=\"${scramUsername}\" " +
202 "password=\"${scramPassword}\";"
207 /** Message Consumer */
208 /** Message Consumer Properties **/
210 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
212 lateinit var groupId: String
213 lateinit var clientId: String
214 var autoCommit: Boolean = true
215 var autoOffsetReset: String = "latest"
216 var pollMillSec: Long = 1000
217 var pollRecords: Int = -1
219 override fun getConfig(): HashMap<String, Any> {
220 val configProperties = super.getConfig()
221 configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
222 configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
224 * earliest: automatically reset the offset to the earliest offset
225 * latest: automatically reset the offset to the latest offset
227 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
228 configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
229 configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
230 // adjust the worker name with the hostname suffix because we'll have several workers, running in
231 // different pods, using the same worker name otherwise.
232 configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
234 /** To handle Back pressure, Get only configured record for processing */
235 if (pollRecords > 0) {
236 configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
239 return configProperties
244 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
246 lateinit var truststore: String
247 lateinit var truststorePassword: String
248 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
249 var keystore: String? = null
250 var keystorePassword: String? = null
251 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
252 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
254 override fun getConfig(): HashMap<String, Any> {
255 val configProps = super.getConfig()
256 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
257 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
258 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
259 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
260 if (keystore != null) {
261 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
262 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
263 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
265 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
270 /** (SASL) SCRAM SSL Auth */
271 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
273 var saslMechanism: String = "SCRAM-SHA-512"
274 lateinit var scramUsername: String
275 lateinit var scramPassword: String
277 override fun getConfig(): HashMap<String, Any> {
278 val configProps = super.getConfig()
279 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
280 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
281 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
282 "username=\"${scramUsername}\" " +
283 "password=\"${scramPassword}\";"
288 /** (SASL) SCRAM Plaintext Auth */
289 class KafkaScramPlaintextAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
291 var saslMechanism: String = "SCRAM-SHA-512"
292 lateinit var scramUsername: String
293 lateinit var scramPassword: String
295 override fun getConfig(): HashMap<String, Any> {
296 val configProps = super.getConfig()
297 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
298 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
299 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
300 "username=\"${scramUsername}\" " +
301 "password=\"${scramPassword}\";"