2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
33 /** Common Properties **/
34 abstract class CommonProperties {
36 lateinit var type: String
37 lateinit var topic: String
38 lateinit var bootstrapServers: String
40 open fun getConfig(): HashMap<String, Any> {
41 val configProps = hashMapOf<String, Any>()
42 configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
47 /** Message Producer */
48 /** Message Producer Properties **/
49 abstract class MessageProducerProperties : CommonProperties()
52 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
54 var clientId: String? = null
55 var acks: String = "all" // strongest producing guarantee
56 var maxBlockMs: Int = 250 // max blocking time in ms to send a message
57 var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
58 var enableIdempotence: Boolean = true // ensure we don't push duplicates
60 override fun getConfig(): HashMap<String, Any> {
61 val configProps = super.getConfig()
62 configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
63 configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
64 configProps[ProducerConfig.ACKS_CONFIG] = acks
65 configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
66 configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
67 configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
68 if (clientId != null) {
69 configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
76 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
78 lateinit var truststore: String
79 lateinit var truststorePassword: String
80 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
81 var keystore: String? = null
82 var keystorePassword: String? = null
83 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
84 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
86 override fun getConfig(): HashMap<String, Any> {
87 val configProps = super.getConfig()
88 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
89 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
90 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
91 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
92 if (keystore != null) {
93 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
94 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
95 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
97 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
103 /** (SASL) SCRAM SSL Auth */
104 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
106 var saslMechanism: String = "SCRAM-SHA-512"
107 lateinit var scramUsername: String
108 lateinit var scramPassword: String
110 override fun getConfig(): HashMap<String, Any> {
111 val configProps = super.getConfig()
112 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
113 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
114 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
115 "username=\"${scramUsername}\" " +
116 "password=\"${scramPassword}\";"
122 abstract class MessageConsumerProperties : CommonProperties()
124 /** Streams properties */
127 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
129 lateinit var applicationId: String
130 var autoOffsetReset: String = "latest"
131 var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
133 override fun getConfig(): HashMap<String, Any> {
134 val configProperties = super.getConfig()
135 configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
136 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
137 configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
138 return configProperties
143 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
145 lateinit var truststore: String
146 lateinit var truststorePassword: String
147 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
148 var keystore: String? = null
149 var keystorePassword: String? = null
150 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
151 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
153 override fun getConfig(): HashMap<String, Any> {
154 val configProps = super.getConfig()
155 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
156 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
157 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
158 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
159 if (keystore != null) {
160 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
161 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
162 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
164 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
169 /** (SASL) SCRAM SSL Auth */
170 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
172 var saslMechanism: String = "SCRAM-SHA-512"
173 lateinit var scramUsername: String
174 lateinit var scramPassword: String
176 override fun getConfig(): HashMap<String, Any> {
177 val configProps = super.getConfig()
178 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
179 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
180 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
181 "username=\"${scramUsername}\" " +
182 "password=\"${scramPassword}\";"
187 /** Message Consumer */
188 /** Message Consumer Properties **/
190 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
192 lateinit var groupId: String
193 lateinit var clientId: String
194 var autoCommit: Boolean = true
195 var autoOffsetReset: String = "latest"
196 var pollMillSec: Long = 1000
197 var pollRecords: Int = -1
199 override fun getConfig(): HashMap<String, Any> {
200 val configProperties = super.getConfig()
201 configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
202 configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
204 * earliest: automatically reset the offset to the earliest offset
205 * latest: automatically reset the offset to the latest offset
207 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
208 configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
209 configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
210 configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
212 /** To handle Back pressure, Get only configured record for processing */
213 if (pollRecords > 0) {
214 configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
217 return configProperties
222 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
224 lateinit var truststore: String
225 lateinit var truststorePassword: String
226 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
227 var keystore: String? = null
228 var keystorePassword: String? = null
229 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
230 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
232 override fun getConfig(): HashMap<String, Any> {
233 val configProps = super.getConfig()
234 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
235 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
236 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
237 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
238 if (keystore != null) {
239 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
240 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
241 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
243 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
248 /** (SASL) SCRAM SSL Auth */
249 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
251 var saslMechanism: String = "SCRAM-SHA-512"
252 lateinit var scramUsername: String
253 lateinit var scramPassword: String
255 override fun getConfig(): HashMap<String, Any> {
256 val configProps = super.getConfig()
257 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
258 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
259 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
260 "username=\"${scramUsername}\" " +
261 "password=\"${scramPassword}\";"