/** Common Properties **/
abstract class CommonProperties {
+
lateinit var type: String
lateinit var topic: String
lateinit var bootstrapServers: String
/** SSL Auth */
open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
/** (SASL) SCRAM SSL Auth */
class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
/** Basic Auth */
open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
+
lateinit var applicationId: String
var autoOffsetReset: String = "latest"
var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
/** SSL Auth */
open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
/** (SASL) SCRAM SSL Auth */
class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
/** Message Consumer Properties **/
/** Basic Auth */
open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
+
lateinit var groupId: String
lateinit var clientId: String
var autoCommit: Boolean = true
/** SSL Auth */
open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
/** (SASL) SCRAM SSL Auth */
class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}