* dcaegen2-collectors-veshv
* ================================================================================
* Copyright (C) 2019 NOKIA
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.plain.internals.PlainSaslServer
-import org.jetbrains.annotations.Nullable
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.applyIf
import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.SenderOptions
+import java.lang.Boolean.parseBoolean
+
+
+
internal object KafkaSenderOptionsFactory {
private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+ private val USE_SCRAM = parseBoolean(System.getenv().getOrDefault("USE_SCRAM", "false"))
fun createSenderOptions(kafkaSink: KafkaSink): SenderOptions<CommonEventHeader, VesMessage> =
SenderOptions.create<CommonEventHeader, VesMessage>()
.producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
.producerProperty(ProducerConfig.ACKS_CONFIG, "1")
.stopOnError(false)
- .applyIf(kafkaSink.aafCredentials() != null) {
- producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
- .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
- .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!))
- }
+ .applyIf(kafkaSink.aafCredentials() != null) {
+ producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+ .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
+ .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!))
+ }
+ .applyIf(USE_SCRAM) {
+ producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+ .producerProperty(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName())
+ .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"))
+ }
private fun jaasConfig(aafCredentials: AafCredentials) =
- """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";"""
+ """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";"""
private fun String?.jaasEscape() = this?.replace("\"", "\\\"")