2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import io.micrometer.core.instrument.MeterRegistry
23 import kotlinx.coroutines.channels.consumeEach
24 import kotlinx.coroutines.delay
25 import kotlinx.coroutines.launch
26 import kotlinx.coroutines.runBlocking
27 import org.apache.kafka.clients.CommonClientConfigs
28 import org.apache.kafka.clients.consumer.Consumer
29 import org.apache.kafka.clients.consumer.ConsumerConfig
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.apache.kafka.clients.consumer.ConsumerRecords
32 import org.apache.kafka.clients.consumer.MockConsumer
33 import org.apache.kafka.clients.consumer.OffsetResetStrategy
34 import org.apache.kafka.clients.producer.ProducerConfig
35 import org.apache.kafka.common.TopicPartition
36 import org.apache.kafka.common.config.SaslConfigs
37 import org.apache.kafka.common.config.SslConfigs
38 import org.apache.kafka.common.security.auth.SecurityProtocol
39 import org.apache.kafka.common.security.scram.ScramLoginModule
40 import org.apache.kafka.common.serialization.ByteArrayDeserializer
41 import org.apache.kafka.common.serialization.StringDeserializer
43 import org.junit.runner.RunWith
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
45 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
48 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
49 import org.onap.ccsdk.cds.controllerblueprints.core.logger
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.mock.mockito.MockBean
52 import org.springframework.test.annotation.DirtiesContext
53 import org.springframework.test.context.ContextConfiguration
54 import org.springframework.test.context.TestPropertySource
55 import org.springframework.test.context.junit4.SpringRunner
56 import java.nio.charset.Charset
57 import kotlin.test.assertEquals
58 import kotlin.test.assertNotNull
59 import kotlin.test.assertTrue
61 @RunWith(SpringRunner::class)
63 @ContextConfiguration(
65 BluePrintMessageLibConfiguration::class,
66 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
72 "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
73 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
74 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
75 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
76 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
77 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
78 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
79 "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
80 "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
81 "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
82 "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
83 "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
84 "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
86 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
87 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
88 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
89 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
90 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
91 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
92 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
93 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
94 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
95 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
98 open class BlueprintMessageConsumerServiceTest {
100 val log = logger(BlueprintMessageConsumerServiceTest::class)
103 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
106 lateinit var meterRegistry: MeterRegistry
109 fun testKafkaBasicAuthConsumerService() {
111 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
112 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
113 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
115 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
117 val topic = "default-topic"
118 val partitions: MutableList<TopicPartition> = arrayListOf()
119 val topicsCollection: MutableList<String> = arrayListOf()
120 partitions.add(TopicPartition(topic, 1))
121 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
122 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
124 val records: Long = 10
125 partitions.forEach { partition ->
126 partitionsBeginningMap[partition] = 0L
127 partitionsEndMap[partition] = records
128 topicsCollection.add(partition.topic())
130 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
131 mockKafkaConsumer.subscribe(topicsCollection)
132 mockKafkaConsumer.rebalance(partitions)
133 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
134 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
136 val record = ConsumerRecord<String, ByteArray>(
137 topic, 1, i.toLong(), "key_$i",
138 "I am message $i".toByteArray()
140 mockKafkaConsumer.addRecord(record)
143 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
144 val channel = spyBlueprintMessageConsumerService.subscribe(null)
147 channel.consumeEach {
150 val value = String(it.value(), Charset.defaultCharset())
151 assertTrue(value.startsWith("I am message"), "failed to get the actual message")
152 assertEquals("key_$i", key)
156 spyBlueprintMessageConsumerService.shutDown()
161 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
163 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
164 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
165 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
167 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
169 val topic = "default-topic"
170 val partitions: MutableList<TopicPartition> = arrayListOf()
171 val topicsCollection: MutableList<String> = arrayListOf()
172 partitions.add(TopicPartition(topic, 1))
173 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
174 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
176 val records: Long = 10
177 partitions.forEach { partition ->
178 partitionsBeginningMap[partition] = 0L
179 partitionsEndMap[partition] = records
180 topicsCollection.add(partition.topic())
182 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
183 mockKafkaConsumer.subscribe(topicsCollection)
184 mockKafkaConsumer.rebalance(partitions)
185 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
186 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
188 val record = ConsumerRecord<String, ByteArray>(
189 topic, 1, i.toLong(), "key_$i",
190 "I am message $i".toByteArray()
192 mockKafkaConsumer.addRecord(record)
195 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
196 /** Test Consumer Function implementation */
197 val consumerFunction = object : KafkaConsumerRecordsFunction {
198 override suspend fun invoke(
199 messageConsumerProperties: MessageConsumerProperties,
200 consumer: Consumer<*, *>,
201 consumerRecords: ConsumerRecords<*, *>
203 val count = consumerRecords.count()
204 log.trace("Received Message count($count)")
207 spyBlueprintMessageConsumerService.consume(consumerFunction)
209 spyBlueprintMessageConsumerService.shutDown()
214 fun testKafkaScramSslAuthConfig() {
215 val expectedConfig = mapOf<String, Any>(
216 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
217 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
218 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
219 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
220 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
221 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
222 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
223 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
224 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
225 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
226 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
227 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
228 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
229 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
230 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
231 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
232 "username=\"sample-user\" " +
233 "password=\"secretpassword\";"
236 val messageConsumerProperties = bluePrintMessageLibPropertyService
237 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
239 val configProps = messageConsumerProperties.getConfig()
242 messageConsumerProperties.topic,
244 "Topic doesn't match the expected value"
247 messageConsumerProperties.type,
248 "kafka-scram-ssl-auth",
249 "Authentication type doesn't match the expected value"
253 configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
254 "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
257 configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
258 "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
261 expectedConfig.forEach {
263 configProps.containsKey(it.key),
264 "Missing expected kafka config key : ${it.key}"
269 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
274 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
276 fun testKafkaIntegration() {
278 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
279 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
280 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
282 val channel = blueprintMessageConsumerService.subscribe(null)
284 channel.consumeEach {
285 log.info("Consumed Message : $it")
289 /** Send message with every 1 sec */
290 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
291 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
295 val headers: MutableMap<String, String> = hashMapOf()
296 headers["id"] = it.toString()
297 blueprintMessageProducerService.sendMessageNB(
299 message = "this is my message($it)",
305 blueprintMessageConsumerService.shutDown()