2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
33 /** Common Properties **/
34 abstract class CommonProperties {
35 lateinit var type: String
36 lateinit var topic: String
37 lateinit var bootstrapServers: String
39 open fun getConfig(): HashMap<String, Any> {
40 val configProps = hashMapOf<String, Any>()
41 configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
46 /** Message Producer */
47 /** Message Producer Properties **/
48 abstract class MessageProducerProperties : CommonProperties()
51 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
53 var clientId: String? = null
54 var acks: String = "all" // strongest producing guarantee
55 var maxBlockMs: Int = 250 // max blocking time in ms to send a message
56 var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
57 var enableIdempotence: Boolean = true // ensure we don't push duplicates
59 override fun getConfig(): HashMap<String, Any> {
60 val configProps = super.getConfig()
61 configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
62 configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
63 configProps[ProducerConfig.ACKS_CONFIG] = acks
64 configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
65 configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
66 configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
67 if (clientId != null) {
68 configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
75 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
76 lateinit var truststore: String
77 lateinit var truststorePassword: String
78 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
79 var keystore: String? = null
80 var keystorePassword: String? = null
81 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
82 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
84 override fun getConfig(): HashMap<String, Any> {
85 val configProps = super.getConfig()
86 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
87 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
88 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
89 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
90 if (keystore != null) {
91 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
92 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
93 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
95 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
101 /** (SASL) SCRAM SSL Auth */
102 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
103 var saslMechanism: String = "SCRAM-SHA-512"
104 lateinit var scramUsername: String
105 lateinit var scramPassword: String
107 override fun getConfig(): HashMap<String, Any> {
108 val configProps = super.getConfig()
109 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
110 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
111 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
112 "username=\"${scramUsername}\" " +
113 "password=\"${scramPassword}\";"
119 abstract class MessageConsumerProperties : CommonProperties()
121 /** Streams properties */
124 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
125 lateinit var applicationId: String
126 var autoOffsetReset: String = "latest"
127 var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
129 override fun getConfig(): HashMap<String, Any> {
130 val configProperties = super.getConfig()
131 configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
132 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
133 configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
134 return configProperties
139 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
140 lateinit var truststore: String
141 lateinit var truststorePassword: String
142 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
143 var keystore: String? = null
144 var keystorePassword: String? = null
145 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
146 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
148 override fun getConfig(): HashMap<String, Any> {
149 val configProps = super.getConfig()
150 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
151 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
152 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
153 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
154 if (keystore != null) {
155 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
156 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
157 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
159 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
164 /** (SASL) SCRAM SSL Auth */
165 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
166 var saslMechanism: String = "SCRAM-SHA-512"
167 lateinit var scramUsername: String
168 lateinit var scramPassword: String
170 override fun getConfig(): HashMap<String, Any> {
171 val configProps = super.getConfig()
172 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
173 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
174 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
175 "username=\"${scramUsername}\" " +
176 "password=\"${scramPassword}\";"
181 /** Message Consumer */
182 /** Message Consumer Properties **/
184 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
185 lateinit var groupId: String
186 lateinit var clientId: String
187 var autoCommit: Boolean = true
188 var autoOffsetReset: String = "latest"
189 var pollMillSec: Long = 1000
190 var pollRecords: Int = -1
192 override fun getConfig(): HashMap<String, Any> {
193 val configProperties = super.getConfig()
194 configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
195 configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
197 * earliest: automatically reset the offset to the earliest offset
198 * latest: automatically reset the offset to the latest offset
200 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
201 configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
202 configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
203 configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
205 /** To handle Back pressure, Get only configured record for processing */
206 if (pollRecords > 0) {
207 configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
210 return configProperties
215 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
216 lateinit var truststore: String
217 lateinit var truststorePassword: String
218 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
219 var keystore: String? = null
220 var keystorePassword: String? = null
221 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
222 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
224 override fun getConfig(): HashMap<String, Any> {
225 val configProps = super.getConfig()
226 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
227 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
228 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
229 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
230 if (keystore != null) {
231 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
232 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
233 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
235 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
240 /** (SASL) SCRAM SSL Auth */
241 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
242 var saslMechanism: String = "SCRAM-SHA-512"
243 lateinit var scramUsername: String
244 lateinit var scramPassword: String
246 override fun getConfig(): HashMap<String, Any> {
247 val configProps = super.getConfig()
248 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
249 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
250 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
251 "username=\"${scramUsername}\" " +
252 "password=\"${scramPassword}\";"