2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
33 /** Common Properties **/
34 abstract class CommonProperties {
35 lateinit var type: String
36 lateinit var topic: String
37 lateinit var bootstrapServers: String
39 open fun getConfig(): HashMap<String, Any> {
40 val configProps = hashMapOf<String, Any>()
41 configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
46 /** Message Producer */
47 /** Message Producer Properties **/
48 abstract class MessageProducerProperties : CommonProperties()
51 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
53 var clientId: String? = null
54 // strongest producing guarantee
55 var acks: String = "all"
57 // ensure we don't push duplicates
58 var enableIdempotence: Boolean = true
60 override fun getConfig(): HashMap<String, Any> {
61 val configProps = super.getConfig()
62 configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
63 configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
64 configProps[ProducerConfig.ACKS_CONFIG] = acks
65 configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
66 if (clientId != null) {
67 configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
74 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
75 lateinit var truststore: String
76 lateinit var truststorePassword: String
77 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
78 var keystore: String? = null
79 var keystorePassword: String? = null
80 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
81 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
83 override fun getConfig(): HashMap<String, Any> {
84 val configProps = super.getConfig()
85 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
86 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
87 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
88 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
89 if (keystore != null) {
90 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
91 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
92 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
94 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
100 /** (SASL) SCRAM SSL Auth */
101 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
102 var saslMechanism: String = "SCRAM-SHA-512"
103 lateinit var scramUsername: String
104 lateinit var scramPassword: String
106 override fun getConfig(): HashMap<String, Any> {
107 val configProps = super.getConfig()
108 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
109 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
110 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
111 "username=\"${scramUsername}\" " +
112 "password=\"${scramPassword}\";"
118 abstract class MessageConsumerProperties : CommonProperties()
120 /** Streams properties */
123 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
124 lateinit var applicationId: String
125 var autoOffsetReset: String = "latest"
126 var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
128 override fun getConfig(): HashMap<String, Any> {
129 val configProperties = super.getConfig()
130 configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
131 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
132 configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
133 return configProperties
138 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
139 lateinit var truststore: String
140 lateinit var truststorePassword: String
141 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
142 var keystore: String? = null
143 var keystorePassword: String? = null
144 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
145 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
147 override fun getConfig(): HashMap<String, Any> {
148 val configProps = super.getConfig()
149 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
150 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
151 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
152 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
153 if (keystore != null) {
154 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
155 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
156 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
158 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
163 /** (SASL) SCRAM SSL Auth */
164 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
165 var saslMechanism: String = "SCRAM-SHA-512"
166 lateinit var scramUsername: String
167 lateinit var scramPassword: String
169 override fun getConfig(): HashMap<String, Any> {
170 val configProps = super.getConfig()
171 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
172 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
173 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
174 "username=\"${scramUsername}\" " +
175 "password=\"${scramPassword}\";"
180 /** Message Consumer */
181 /** Message Consumer Properties **/
183 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
184 lateinit var groupId: String
185 lateinit var clientId: String
186 var autoCommit: Boolean = true
187 var autoOffsetReset: String = "latest"
188 var pollMillSec: Long = 1000
189 var pollRecords: Int = -1
191 override fun getConfig(): HashMap<String, Any> {
192 val configProperties = super.getConfig()
193 configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
194 configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
196 * earliest: automatically reset the offset to the earliest offset
197 * latest: automatically reset the offset to the latest offset
199 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
200 configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
201 configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
202 configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
204 /** To handle Back pressure, Get only configured record for processing */
205 if (pollRecords > 0) {
206 configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
209 return configProperties
214 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
215 lateinit var truststore: String
216 lateinit var truststorePassword: String
217 var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
218 var keystore: String? = null
219 var keystorePassword: String? = null
220 var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
221 var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
223 override fun getConfig(): HashMap<String, Any> {
224 val configProps = super.getConfig()
225 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
226 configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
227 configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
228 configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
229 if (keystore != null) {
230 configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
231 configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
232 configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
234 configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
239 /** (SASL) SCRAM SSL Auth */
240 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
241 var saslMechanism: String = "SCRAM-SHA-512"
242 lateinit var scramUsername: String
243 lateinit var scramPassword: String
245 override fun getConfig(): HashMap<String, Any> {
246 val configProps = super.getConfig()
247 configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
248 configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
249 configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
250 "username=\"${scramUsername}\" " +
251 "password=\"${scramPassword}\";"