2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
34 /** Common Properties **/
35 abstract class CommonProperties {
37 lateinit var type: String
38 lateinit var topic: String
39 lateinit var bootstrapServers: String
41 open fun getConfig(): HashMap<String, Any> {
42 val configProps = hashMapOf<String, Any>()
43 configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
48 /** Message Producer */
49 /** Message Producer Properties **/
50 abstract class MessageProducerProperties : CommonProperties()
53 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
55 lateinit var clientId: String
56 var acks: String = "all" // strongest producing guarantee
57 var maxBlockMs: Int = 250 // max blocking time in ms to send a message
58 var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
59 var enableIdempotence: Boolean = true // ensure we don't push duplicates
61 override fun getConfig(): HashMap<String, Any> {
62 val configProps = super.getConfig()
63 configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
64 configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
65 configProps[ProducerConfig.ACKS_CONFIG] = acks
66 configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
67 configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
68 configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
69 configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
75 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
77 lateinit var truststore: String
78 lateinit var truststorePassword: String
79 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
80 var keystore: String? = null
81 var keystorePassword: String? = null
82 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
83 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
85 override fun getConfig(): HashMap<String, Any> {
86 val configProps = super.getConfig()
87 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
88 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
89 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
90 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
91 if (keystore != null) {
92 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
93 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
94 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
96 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
102 /** (SASL) SCRAM SSL Auth */
103 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
105 var saslMechanism: String = "SCRAM-SHA-512"
106 lateinit var scramUsername: String
107 lateinit var scramPassword: String
109 override fun getConfig(): HashMap<String, Any> {
110 val configProps = super.getConfig()
111 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
112 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
113 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
114 "username=\"${scramUsername}\" " +
115 "password=\"${scramPassword}\";"
121 abstract class MessageConsumerProperties : CommonProperties()
123 /** Streams properties */
126 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
128 lateinit var applicationId: String
129 var autoOffsetReset: String = "latest"
130 var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
132 override fun getConfig(): HashMap<String, Any> {
133 val configProperties = super.getConfig()
134 // adjust the worker name with the hostname suffix because we'll have several workers, running in
135 // different pods, using the same worker name otherwise.
136 configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}"
137 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
138 configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
139 return configProperties
144 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
146 lateinit var truststore: String
147 lateinit var truststorePassword: String
148 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
149 var keystore: String? = null
150 var keystorePassword: String? = null
151 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
152 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
154 override fun getConfig(): HashMap<String, Any> {
155 val configProps = super.getConfig()
156 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
157 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
158 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
159 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
160 if (keystore != null) {
161 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
162 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
163 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
165 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
170 /** (SASL) SCRAM SSL Auth */
171 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
173 var saslMechanism: String = "SCRAM-SHA-512"
174 lateinit var scramUsername: String
175 lateinit var scramPassword: String
177 override fun getConfig(): HashMap<String, Any> {
178 val configProps = super.getConfig()
179 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
180 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
181 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
182 "username=\"${scramUsername}\" " +
183 "password=\"${scramPassword}\";"
188 /** Message Consumer */
189 /** Message Consumer Properties **/
191 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
193 lateinit var groupId: String
194 lateinit var clientId: String
195 var autoCommit: Boolean = true
196 var autoOffsetReset: String = "latest"
197 var pollMillSec: Long = 1000
198 var pollRecords: Int = -1
200 override fun getConfig(): HashMap<String, Any> {
201 val configProperties = super.getConfig()
202 configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
203 configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
205 * earliest: automatically reset the offset to the earliest offset
206 * latest: automatically reset the offset to the latest offset
208 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
209 configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
210 configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
211 // adjust the worker name with the hostname suffix because we'll have several workers, running in
212 // different pods, using the same worker name otherwise.
213 configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
215 /** To handle Back pressure, Get only configured record for processing */
216 if (pollRecords > 0) {
217 configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
220 return configProperties
225 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
227 lateinit var truststore: String
228 lateinit var truststorePassword: String
229 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
230 var keystore: String? = null
231 var keystorePassword: String? = null
232 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
233 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
235 override fun getConfig(): HashMap<String, Any> {
236 val configProps = super.getConfig()
237 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
238 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
239 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
240 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
241 if (keystore != null) {
242 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
243 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
244 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
246 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
251 /** (SASL) SCRAM SSL Auth */
252 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
254 var saslMechanism: String = "SCRAM-SHA-512"
255 lateinit var scramUsername: String
256 lateinit var scramPassword: String
258 override fun getConfig(): HashMap<String, Any> {
259 val configProps = super.getConfig()
260 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
261 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
262 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
263 "username=\"${scramUsername}\" " +
264 "password=\"${scramPassword}\";"