d76621c26afbbadc756f9de7a4acfb95f5aac6f1
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
19
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
32
33 /** Common Properties **/
34 abstract class CommonProperties {
35     lateinit var type: String
36     lateinit var topic: String
37     lateinit var bootstrapServers: String
38
39     open fun getConfig(): HashMap<String, Any> {
40         val configProps = hashMapOf<String, Any>()
41         configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
42         return configProps
43     }
44 }
45
46 /** Message Producer */
47 /** Message Producer Properties **/
48 abstract class MessageProducerProperties : CommonProperties()
49
50 /** Basic Auth */
51 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
52
53     var clientId: String? = null
54     var acks: String = "all" // strongest producing guarantee
55     var maxBlockMs: Int = 250 // max blocking time in ms to send a message
56     var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
57     var enableIdempotence: Boolean = true // ensure we don't push duplicates
58
59     override fun getConfig(): HashMap<String, Any> {
60         val configProps = super.getConfig()
61         configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
62         configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
63         configProps[ProducerConfig.ACKS_CONFIG] = acks
64         configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
65         configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
66         configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
67         if (clientId != null) {
68             configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
69         }
70         return configProps
71     }
72 }
73
74 /** SSL Auth */
75 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
76     lateinit var truststore: String
77     lateinit var truststorePassword: String
78     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
79     var keystore: String? = null
80     var keystorePassword: String? = null
81     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
82     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
83
84     override fun getConfig(): HashMap<String, Any> {
85         val configProps = super.getConfig()
86         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
87         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
88         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
89         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
90         if (keystore != null) {
91             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
92             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
93             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
94         }
95         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
96
97         return configProps
98     }
99 }
100
101 /** (SASL) SCRAM SSL Auth */
102 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
103     var saslMechanism: String = "SCRAM-SHA-512"
104     lateinit var scramUsername: String
105     lateinit var scramPassword: String
106
107     override fun getConfig(): HashMap<String, Any> {
108         val configProps = super.getConfig()
109         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
110         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
111         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
112                 "username=\"${scramUsername}\" " +
113                 "password=\"${scramPassword}\";"
114         return configProps
115     }
116 }
117
118 /** Consumer */
119 abstract class MessageConsumerProperties : CommonProperties()
120 /** Kafka Streams */
121 /** Streams properties */
122
123 /** Basic Auth */
124 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
125     lateinit var applicationId: String
126     var autoOffsetReset: String = "latest"
127     var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
128
129     override fun getConfig(): HashMap<String, Any> {
130         val configProperties = super.getConfig()
131         configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
132         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
133         configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
134         return configProperties
135     }
136 }
137
138 /** SSL Auth */
139 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
140     lateinit var truststore: String
141     lateinit var truststorePassword: String
142     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
143     var keystore: String? = null
144     var keystorePassword: String? = null
145     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
146     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
147
148     override fun getConfig(): HashMap<String, Any> {
149         val configProps = super.getConfig()
150         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
151         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
152         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
153         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
154         if (keystore != null) {
155             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
156             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
157             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
158         }
159         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
160         return configProps
161     }
162 }
163
164 /** (SASL) SCRAM SSL Auth */
165 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
166     var saslMechanism: String = "SCRAM-SHA-512"
167     lateinit var scramUsername: String
168     lateinit var scramPassword: String
169
170     override fun getConfig(): HashMap<String, Any> {
171         val configProps = super.getConfig()
172         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
173         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
174         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
175                 "username=\"${scramUsername}\" " +
176                 "password=\"${scramPassword}\";"
177         return configProps
178     }
179 }
180
181 /** Message Consumer */
182 /** Message Consumer Properties **/
183 /** Basic Auth */
184 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
185     lateinit var groupId: String
186     lateinit var clientId: String
187     var autoCommit: Boolean = true
188     var autoOffsetReset: String = "latest"
189     var pollMillSec: Long = 1000
190     var pollRecords: Int = -1
191
192     override fun getConfig(): HashMap<String, Any> {
193         val configProperties = super.getConfig()
194         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
195         configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
196         /**
197          * earliest: automatically reset the offset to the earliest offset
198          * latest: automatically reset the offset to the latest offset
199          */
200         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
201         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
202         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
203         configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
204
205         /** To handle Back pressure, Get only configured record for processing */
206         if (pollRecords > 0) {
207             configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
208         }
209
210         return configProperties
211     }
212 }
213
214 /** SSL Auth */
215 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
216     lateinit var truststore: String
217     lateinit var truststorePassword: String
218     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
219     var keystore: String? = null
220     var keystorePassword: String? = null
221     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
222     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
223
224     override fun getConfig(): HashMap<String, Any> {
225         val configProps = super.getConfig()
226         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
227         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
228         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
229         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
230         if (keystore != null) {
231             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
232             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
233             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
234         }
235         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
236         return configProps
237     }
238 }
239
240 /** (SASL) SCRAM SSL Auth */
241 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
242     var saslMechanism: String = "SCRAM-SHA-512"
243     lateinit var scramUsername: String
244     lateinit var scramPassword: String
245
246     override fun getConfig(): HashMap<String, Any> {
247         val configProps = super.getConfig()
248         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
249         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
250         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
251                 "username=\"${scramUsername}\" " +
252                 "password=\"${scramPassword}\";"
253         return configProps
254     }
255 }