2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2021 Bell Canada.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import io.micrometer.core.instrument.MeterRegistry
24 import kotlinx.coroutines.runBlocking
25 import org.apache.kafka.clients.CommonClientConfigs
26 import org.apache.kafka.clients.consumer.ConsumerConfig
27 import org.apache.kafka.clients.producer.KafkaProducer
28 import org.apache.kafka.clients.producer.ProducerConfig
29 import org.apache.kafka.clients.producer.RecordMetadata
30 import org.apache.kafka.common.config.SaslConfigs
31 import org.apache.kafka.common.config.SslConfigs
32 import org.apache.kafka.common.security.auth.SecurityProtocol
33 import org.apache.kafka.common.security.scram.ScramLoginModule
34 import org.apache.kafka.common.serialization.ByteArraySerializer
35 import org.apache.kafka.common.serialization.StringSerializer
36 import org.junit.runner.RunWith
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
38 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
41 import org.springframework.beans.factory.annotation.Autowired
42 import org.springframework.boot.test.mock.mockito.MockBean
43 import org.springframework.test.annotation.DirtiesContext
44 import org.springframework.test.context.ContextConfiguration
45 import org.springframework.test.context.TestPropertySource
46 import org.springframework.test.context.junit4.SpringRunner
47 import java.util.concurrent.Future
48 import kotlin.test.Test
49 import kotlin.test.assertEquals
50 import kotlin.test.assertTrue
52 @RunWith(SpringRunner::class)
54 @ContextConfiguration(
56 BluePrintMessageLibConfiguration::class,
57 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
63 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
64 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
65 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
66 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
67 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
68 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
69 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
70 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
71 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
72 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
75 open class BlueprintMessageProducerServiceTest {
78 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
81 lateinit var meterRegistry: MeterRegistry
84 fun testKafkaScramSslAuthProducerService() {
86 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
87 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
89 val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
91 val responseMock = mockk<Future<RecordMetadata>>()
92 every { responseMock.get() } returns mockk()
94 every { mockKafkaTemplate.send(any(), any()) } returns responseMock
96 val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
98 every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
100 val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message")
101 assertTrue(response, "failed to get command response")
106 fun testKafkaScramSslAuthConfig() {
107 val expectedConfig = mapOf<String, Any>(
108 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
109 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
110 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
111 ProducerConfig.ACKS_CONFIG to "all",
112 ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
113 ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
114 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
115 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
116 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
117 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
118 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
119 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
120 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
121 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
122 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
123 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
124 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
125 "username=\"sample-user\" " +
126 "password=\"secretpassword\";"
129 val messageProducerProperties = bluePrintMessageLibPropertyService
130 .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
132 val configProps = messageProducerProperties.getConfig()
135 messageProducerProperties.topic,
137 "Topic doesn't match the expected value"
140 messageProducerProperties.type,
141 "kafka-scram-ssl-auth",
142 "Authentication type doesn't match the expected value"
146 configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
147 "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
150 configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
151 "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
154 expectedConfig.forEach {
156 configProps.containsKey(it.key),
157 "Missing expected kafka config key : ${it.key}"
162 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"