fec17856f6c228df744009a2aa25210a35552e10
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaSenderOptionsFactoryTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
21
22 import org.apache.kafka.clients.CommonClientConfigs
23 import org.apache.kafka.clients.producer.ProducerConfig
24 import org.apache.kafka.common.KafkaException
25 import org.apache.kafka.common.config.SaslConfigs
26 import org.assertj.core.api.Assertions.assertThat
27 import org.jetbrains.spek.api.Spek
28 import org.jetbrains.spek.api.dsl.TestContainer
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.given
31 import org.jetbrains.spek.api.dsl.it
32 import org.jetbrains.spek.api.dsl.on
33 import org.onap.dcae.collectors.veshv.domain.VesMessage
34 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
37 import org.onap.ves.VesEventOuterClass
38 import reactor.kafka.sender.SenderOptions
39 import java.io.IOException
40 import java.io.StreamTokenizer
41 import java.io.StringReader
42 import java.util.*
43 import javax.security.auth.login.AppConfigurationEntry
44 import javax.security.auth.login.Configuration
45
46 /**
47  * @author [Piotr Jaszczyk](mailto:piotr.jaszczyk@nokia.com)
48  * @since April 2019
49  */
50 internal class KafkaSenderOptionsFactoryTest : Spek({
51     describe("creation of Kafka Sender options") {
52
53         given("unauthenticated KafkaSink") {
54             val sink = ImmutableKafkaSink.builder()
55                     .bootstrapServers("dmaap1,dmaap2")
56                     .topicName("PERF_DATA")
57                     .build()
58
59             on("calling the CUT method") {
60                 val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
61                 val itShouldHavePropertySet = propertyChecker(result)
62
63                 itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
64                 itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
65                 itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
66                 itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
67                 itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
68                 itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
69                 itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
70                 itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
71
72                 itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, null)
73                 itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, null)
74                 itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, null)
75             }
76
77         }
78         given("authenticated KafkaSink") {
79             val aafCredentials = ImmutableAafCredentials.builder()
80                     .username("user \" with quote")
81                     .password("password \" with quote")
82                     .build()
83             val sink = ImmutableKafkaSink.builder()
84                     .bootstrapServers("dmaap-service")
85                     .topicName("OTHER_TOPIC")
86                     .aafCredentials(aafCredentials)
87                     .build()
88
89             on("calling the CUT method") {
90                 val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
91                 val itShouldHavePropertySet = propertyChecker(result)
92
93                 itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
94                 itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
95                 itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
96                 itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
97                 itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
98                 itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
99                 itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
100                 itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
101
102                 itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
103                 itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN")
104                 itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG,
105                         "org.apache.kafka.common.security.plain.PlainLoginModule required " +
106                                 """username="user \" with quote" password="password \" with quote";""")
107             }
108
109         }
110
111     }
112 })
113
114 private fun TestContainer.propertyChecker(actual: SenderOptions<VesEventOuterClass.CommonEventHeader, VesMessage>) =
115         { property: String, expectedValue: Any? ->
116             it("should have '$property' property set to '$expectedValue'") {
117                 assertThat(actual.producerProperty(property)).isEqualTo(expectedValue)
118             }
119         }