[DCAE-HV-VES] Add jaas config for kafka connect
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaSenderOptionsFactory.kt
index 1c4acf6..2fcc99e 100644 (file)
@@ -3,6 +3,7 @@
  * dcaegen2-collectors-veshv
  * ================================================================================
  * Copyright (C) 2019 NOKIA
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,13 +25,17 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.plain.internals.PlainSaslServer
-import org.jetbrains.annotations.Nullable
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.utils.applyIf
 import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.kafka.sender.SenderOptions
+import java.lang.Boolean.parseBoolean
+
+
+
 
 internal object KafkaSenderOptionsFactory {
 
@@ -40,6 +45,7 @@ internal object KafkaSenderOptionsFactory {
 
     private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
     private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+    private val USE_SCRAM = parseBoolean(System.getenv().getOrDefault("USE_SCRAM", "false"))
 
     fun createSenderOptions(kafkaSink: KafkaSink): SenderOptions<CommonEventHeader, VesMessage> =
             SenderOptions.create<CommonEventHeader, VesMessage>()
@@ -52,14 +58,19 @@ internal object KafkaSenderOptionsFactory {
                     .producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
                     .producerProperty(ProducerConfig.ACKS_CONFIG, "1")
                     .stopOnError(false)
-                    .applyIf(kafkaSink.aafCredentials() != null) {
-                        producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
-                                .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
-                                .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!))
-                    }
+                .applyIf(kafkaSink.aafCredentials() != null) {
+                    producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+                        .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
+                        .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!))
+                }
+                .applyIf(USE_SCRAM) {
+                    producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+                    .producerProperty(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName())
+                    .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"))
+                }
 
     private fun jaasConfig(aafCredentials: AafCredentials) =
-            """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";"""
+        """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";"""
 
     private fun String?.jaasEscape() = this?.replace("\"", "\\\"")