2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
22 import org.apache.kafka.clients.CommonClientConfigs
23 import org.apache.kafka.clients.producer.ProducerConfig
24 import org.apache.kafka.common.KafkaException
25 import org.apache.kafka.common.config.SaslConfigs
26 import org.assertj.core.api.Assertions.assertThat
27 import org.jetbrains.spek.api.Spek
28 import org.jetbrains.spek.api.dsl.TestContainer
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.given
31 import org.jetbrains.spek.api.dsl.it
32 import org.jetbrains.spek.api.dsl.on
33 import org.onap.dcae.collectors.veshv.domain.VesMessage
34 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
37 import org.onap.ves.VesEventOuterClass
38 import reactor.kafka.sender.SenderOptions
39 import java.io.IOException
40 import java.io.StreamTokenizer
41 import java.io.StringReader
43 import javax.security.auth.login.AppConfigurationEntry
44 import javax.security.auth.login.Configuration
47 * @author [Piotr Jaszczyk](mailto:piotr.jaszczyk@nokia.com)
50 internal class KafkaSenderOptionsFactoryTest : Spek({
51 describe("creation of Kafka Sender options") {
53 given("unauthenticated KafkaSink") {
54 val sink = ImmutableKafkaSink.builder()
55 .bootstrapServers("dmaap1,dmaap2")
56 .topicName("PERF_DATA")
59 on("calling the CUT method") {
60 val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
61 val itShouldHavePropertySet = propertyChecker(result)
63 itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
64 itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
65 itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
66 itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
67 itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
68 itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
69 itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
70 itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
72 itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, null)
73 itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, null)
74 itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, null)
78 given("authenticated KafkaSink") {
79 val aafCredentials = ImmutableAafCredentials.builder()
80 .username("user \" with quote")
81 .password("password \" with quote")
83 val sink = ImmutableKafkaSink.builder()
84 .bootstrapServers("dmaap-service")
85 .topicName("OTHER_TOPIC")
86 .aafCredentials(aafCredentials)
89 on("calling the CUT method") {
90 val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
91 val itShouldHavePropertySet = propertyChecker(result)
93 itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
94 itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
95 itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
96 itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
97 itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
98 itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
99 itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
100 itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
102 itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
103 itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN")
104 itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG,
105 "org.apache.kafka.common.security.plain.PlainLoginModule required " +
106 """username="user \" with quote" password="password \" with quote";""")
114 private fun TestContainer.propertyChecker(actual: SenderOptions<VesEventOuterClass.CommonEventHeader, VesMessage>) =
115 { property: String, expectedValue: Any? ->
116 it("should have '$property' property set to '$expectedValue'") {
117 assertThat(actual.producerProperty(property)).isEqualTo(expectedValue)