2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2021 Bell Canada.
4 * Modification Copyright (C) 2022 Nordix Foundation.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
21 import io.micrometer.core.instrument.MeterRegistry
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.ConsumerConfig
28 import org.apache.kafka.clients.producer.KafkaProducer
29 import org.apache.kafka.clients.producer.ProducerConfig
30 import org.apache.kafka.clients.producer.RecordMetadata
31 import org.apache.kafka.common.config.SaslConfigs
32 import org.apache.kafka.common.config.SslConfigs
33 import org.apache.kafka.common.security.auth.SecurityProtocol
34 import org.apache.kafka.common.security.scram.ScramLoginModule
35 import org.apache.kafka.common.serialization.ByteArraySerializer
36 import org.apache.kafka.common.serialization.StringSerializer
37 import org.junit.runner.RunWith
38 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
39 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
42 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants.Companion.PROPERTY_MESSAGE_PRODUCER_PREFIX
43 import org.springframework.beans.factory.annotation.Autowired
44 import org.springframework.boot.test.mock.mockito.MockBean
45 import org.springframework.test.annotation.DirtiesContext
46 import org.springframework.test.context.ContextConfiguration
47 import org.springframework.test.context.TestPropertySource
48 import org.springframework.test.context.junit4.SpringRunner
49 import java.util.concurrent.Future
50 import kotlin.test.Test
51 import kotlin.test.assertEquals
52 import kotlin.test.assertTrue
54 @RunWith(SpringRunner::class)
56 @ContextConfiguration(
58 BluePrintMessageLibConfiguration::class,
59 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
65 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
66 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
67 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
68 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
69 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
70 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
71 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
72 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
73 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
74 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
76 "blueprintsprocessor.messageproducer.sample2.type=kafka-scram-plain-text-auth",
77 "blueprintsprocessor.messageproducer.sample2.bootstrapServers=127.0.0.1:9092",
78 "blueprintsprocessor.messageproducer.sample2.topic=default-topic",
79 "blueprintsprocessor.messageproducer.sample2.clientId=default-client-id",
80 "blueprintsprocessor.messageproducer.sample2.scramUsername=sample-user",
81 "blueprintsprocessor.messageproducer.sample2.scramPassword=secretpassword"
85 open class BlueprintMessageProducerServiceTest {
88 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
91 lateinit var meterRegistry: MeterRegistry
94 fun testKafkaScramSslAuthProducerService() {
96 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
97 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
99 val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
101 val responseMock = mockk<Future<RecordMetadata>>()
102 every { responseMock.get() } returns mockk()
104 every { mockKafkaTemplate.send(any(), any()) } returns responseMock
106 val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
108 every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
110 val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message")
111 assertTrue(response, "failed to get command response")
116 fun testKafkaScramSslAuthConfig() {
117 val expectedConfig = mapOf<String, Any>(
118 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
119 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
120 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
121 ProducerConfig.ACKS_CONFIG to "all",
122 ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
123 ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
124 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
125 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
126 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
127 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
128 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
129 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
130 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
131 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
132 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
133 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
134 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
135 "username=\"sample-user\" " +
136 "password=\"secretpassword\";"
139 val messageProducerProperties = bluePrintMessageLibPropertyService
140 .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
142 val configProps = messageProducerProperties.getConfig()
145 messageProducerProperties.topic,
147 "Topic doesn't match the expected value"
150 messageProducerProperties.type,
151 "kafka-scram-ssl-auth",
152 "Authentication type doesn't match the expected value"
156 configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
157 "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
160 configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
161 "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
164 expectedConfig.forEach {
166 configProps.containsKey(it.key),
167 "Missing expected kafka config key : ${it.key}"
172 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
178 fun testKafkaScramPlaintextAuthConfig() {
180 val expectedConfig = mapOf<String, Any>(
181 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
182 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
183 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
184 ProducerConfig.ACKS_CONFIG to "all",
185 ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
186 ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
187 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
188 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
191 val messageProducerProperties = bluePrintMessageLibPropertyService
192 .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample2")
194 val configProps = messageProducerProperties.getConfig()
197 messageProducerProperties.topic,
199 "Topic doesn't match the expected value"
202 messageProducerProperties.type,
203 "kafka-scram-plain-text-auth",
204 "Authentication type doesn't match the expected value"
208 configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
209 "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
212 configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
213 "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
216 expectedConfig.forEach {
218 configProps.containsKey(it.key),
219 "Missing expected kafka config key : ${it.key}"
224 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"