[DCAE-HV-VES] Add jaas config for kafka connect
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaSenderOptionsFactoryTest.kt
index fec1785..37d6783 100644 (file)
@@ -3,6 +3,7 @@
  * dcaegen2-collectors-veshv
  * ================================================================================
  * Copyright (C) 2019 NOKIA
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
+import io.kotest.extensions.system.OverrideMode
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.config.SaslConfigs
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.TestContainer
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import org.onap.ves.VesEventOuterClass
 import reactor.kafka.sender.SenderOptions
-import java.io.IOException
-import java.io.StreamTokenizer
-import java.io.StringReader
-import java.util.*
-import javax.security.auth.login.AppConfigurationEntry
-import javax.security.auth.login.Configuration
+import io.kotest.extensions.system.withEnvironment
+import io.kotest.matchers.shouldBe
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.on
+import org.jetbrains.spek.api.dsl.TestContainer
+import org.jetbrains.spek.api.dsl.it
 
 /**
  * @author [Piotr Jaszczyk](mailto:piotr.jaszczyk@nokia.com)
@@ -55,7 +51,6 @@ internal class KafkaSenderOptionsFactoryTest : Spek({
                     .bootstrapServers("dmaap1,dmaap2")
                     .topicName("PERF_DATA")
                     .build()
-
             on("calling the CUT method") {
                 val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
                 val itShouldHavePropertySet = propertyChecker(result)
@@ -68,22 +63,54 @@ internal class KafkaSenderOptionsFactoryTest : Spek({
                 itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
                 itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
                 itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
-
                 itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, null)
                 itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, null)
                 itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, null)
             }
 
         }
-        given("authenticated KafkaSink") {
+        given("authenticated AAF KafkaSink") {
             val aafCredentials = ImmutableAafCredentials.builder()
-                    .username("user \" with quote")
-                    .password("password \" with quote")
-                    .build()
+                .username("user \" with quote")
+                .password("password \" with quote")
+                .build()
+
+            val sink = ImmutableKafkaSink.builder()
+                .bootstrapServers("dmaap-service")
+                .topicName("OTHER_TOPIC")
+                .aafCredentials(aafCredentials)
+                .build()
+
+            on("calling the CUT method") {
+                val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
+                val itShouldHavePropertySet = propertyChecker(result)
+
+                itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
+                itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
+                itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
+                itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+                itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+                itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+                itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
+                itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
+                itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
+                itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN")
+                itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG,
+                    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+                            """username="user \" with quote" password="password \" with quote";""")
+            }
+
+        }
+        given("authenticated SCRAM KafkaSink") {
+            withEnvironment("USE_SCRAM", "true", OverrideMode.SetOrOverride) {
+                System.getenv("USE_SCRAM") shouldBe "true"
+            }
+            withEnvironment("JAAS_CONFIG", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";", OverrideMode.SetOrOverride) {
+                System.getenv("JAAS_CONFIG") shouldBe "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";"
+            }
             val sink = ImmutableKafkaSink.builder()
                     .bootstrapServers("dmaap-service")
                     .topicName("OTHER_TOPIC")
-                    .aafCredentials(aafCredentials)
                     .build()
 
             on("calling the CUT method") {
@@ -98,12 +125,10 @@ internal class KafkaSenderOptionsFactoryTest : Spek({
                 itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
                 itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
                 itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
-
                 itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
-                itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN")
+                itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
                 itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG,
-                        "org.apache.kafka.common.security.plain.PlainLoginModule required " +
-                                """username="user \" with quote" password="password \" with quote";""")
+                        "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";")
             }
 
         }