2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
4 * Modification Copyright (C) 2022 Nordix Foundation.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
21 import io.micrometer.core.instrument.MeterRegistry
24 import kotlinx.coroutines.channels.consumeEach
25 import kotlinx.coroutines.delay
26 import kotlinx.coroutines.launch
27 import kotlinx.coroutines.runBlocking
28 import org.apache.kafka.clients.CommonClientConfigs
29 import org.apache.kafka.clients.consumer.Consumer
30 import org.apache.kafka.clients.consumer.ConsumerConfig
31 import org.apache.kafka.clients.consumer.ConsumerRecord
32 import org.apache.kafka.clients.consumer.ConsumerRecords
33 import org.apache.kafka.clients.consumer.MockConsumer
34 import org.apache.kafka.clients.consumer.OffsetResetStrategy
35 import org.apache.kafka.clients.producer.ProducerConfig
36 import org.apache.kafka.common.TopicPartition
37 import org.apache.kafka.common.config.SaslConfigs
38 import org.apache.kafka.common.config.SslConfigs
39 import org.apache.kafka.common.security.auth.SecurityProtocol
40 import org.apache.kafka.common.security.scram.ScramLoginModule
41 import org.apache.kafka.common.serialization.ByteArrayDeserializer
42 import org.apache.kafka.common.serialization.StringDeserializer
44 import org.junit.runner.RunWith
45 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
46 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
48 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
49 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
50 import org.onap.ccsdk.cds.controllerblueprints.core.logger
51 import org.springframework.beans.factory.annotation.Autowired
52 import org.springframework.boot.test.mock.mockito.MockBean
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import java.nio.charset.Charset
58 import kotlin.test.assertEquals
59 import kotlin.test.assertNotNull
60 import kotlin.test.assertTrue
62 @RunWith(SpringRunner::class)
64 @ContextConfiguration(
66 BluePrintMessageLibConfiguration::class,
67 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
73 "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
74 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
75 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
76 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
77 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
78 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
79 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
80 "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
81 "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
82 "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
83 "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
84 "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
85 "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
87 "blueprintsprocessor.messageconsumer.sample2.type=kafka-scram-plain-text-auth",
88 "blueprintsprocessor.messageconsumer.sample2.bootstrapServers=127.0.0.1:9092",
89 "blueprintsprocessor.messageconsumer.sample2.groupId=sample-group",
90 "blueprintsprocessor.messageconsumer.sample2.topic=default-topic",
91 "blueprintsprocessor.messageconsumer.sample2.clientId=default-client-id",
92 "blueprintsprocessor.messageconsumer.sample2.pollMillSec=10",
93 "blueprintsprocessor.messageconsumer.sample2.pollRecords=1",
94 "blueprintsprocessor.messageconsumer.sample2.scramUsername=sample-user",
95 "blueprintsprocessor.messageconsumer.sample2.scramPassword=secretpassword",
97 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
98 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
99 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
100 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
101 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
102 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
103 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
104 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
105 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
106 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
109 open class BlueprintMessageConsumerServiceTest {
111 val log = logger(BlueprintMessageConsumerServiceTest::class)
114 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
117 lateinit var meterRegistry: MeterRegistry
120 fun testKafkaBasicAuthConsumerService() {
122 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
123 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
124 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
126 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
128 val topic = "default-topic"
129 val partitions: MutableList<TopicPartition> = arrayListOf()
130 val topicsCollection: MutableList<String> = arrayListOf()
131 partitions.add(TopicPartition(topic, 1))
132 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
133 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
135 val records: Long = 10
136 partitions.forEach { partition ->
137 partitionsBeginningMap[partition] = 0L
138 partitionsEndMap[partition] = records
139 topicsCollection.add(partition.topic())
141 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
142 mockKafkaConsumer.subscribe(topicsCollection)
143 mockKafkaConsumer.rebalance(partitions)
144 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
145 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
147 val record = ConsumerRecord<String, ByteArray>(
148 topic, 1, i.toLong(), "key_$i",
149 "I am message $i".toByteArray()
151 mockKafkaConsumer.addRecord(record)
154 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
155 val channel = spyBlueprintMessageConsumerService.subscribe(null)
158 channel.consumeEach {
161 val value = String(it.value(), Charset.defaultCharset())
162 assertTrue(value.startsWith("I am message"), "failed to get the actual message")
163 assertEquals("key_$i", key)
167 spyBlueprintMessageConsumerService.shutDown()
172 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
174 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
175 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
176 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
178 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
180 val topic = "default-topic"
181 val partitions: MutableList<TopicPartition> = arrayListOf()
182 val topicsCollection: MutableList<String> = arrayListOf()
183 partitions.add(TopicPartition(topic, 1))
184 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
185 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
187 val records: Long = 10
188 partitions.forEach { partition ->
189 partitionsBeginningMap[partition] = 0L
190 partitionsEndMap[partition] = records
191 topicsCollection.add(partition.topic())
193 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
194 mockKafkaConsumer.subscribe(topicsCollection)
195 mockKafkaConsumer.rebalance(partitions)
196 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
197 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
199 val record = ConsumerRecord<String, ByteArray>(
200 topic, 1, i.toLong(), "key_$i",
201 "I am message $i".toByteArray()
203 mockKafkaConsumer.addRecord(record)
206 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
207 /** Test Consumer Function implementation */
208 val consumerFunction = object : KafkaConsumerRecordsFunction {
209 override suspend fun invoke(
210 messageConsumerProperties: MessageConsumerProperties,
211 consumer: Consumer<*, *>,
212 consumerRecords: ConsumerRecords<*, *>
214 val count = consumerRecords.count()
215 log.trace("Received Message count($count)")
218 spyBlueprintMessageConsumerService.consume(consumerFunction)
220 spyBlueprintMessageConsumerService.shutDown()
225 fun testKafkaScramSslAuthConfig() {
226 val expectedConfig = mapOf<String, Any>(
227 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
228 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
229 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
230 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
231 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
232 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
233 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
234 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
235 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
236 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
237 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
238 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
239 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
240 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
241 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
242 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
243 "username=\"sample-user\" " +
244 "password=\"secretpassword\";"
247 val messageConsumerProperties = bluePrintMessageLibPropertyService
248 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
250 val configProps = messageConsumerProperties.getConfig()
253 messageConsumerProperties.topic,
255 "Topic doesn't match the expected value"
258 messageConsumerProperties.type,
259 "kafka-scram-ssl-auth",
260 "Authentication type doesn't match the expected value"
264 configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
265 "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
268 configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
269 "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
272 expectedConfig.forEach {
274 configProps.containsKey(it.key),
275 "Missing expected kafka config key : ${it.key}"
280 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
287 fun testKafkaScramPlaintextAuthConfig() {
288 val expectedConfig = mapOf<String, Any>(
289 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
290 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
291 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
292 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
293 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
294 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
295 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
298 val messageConsumerProperties = bluePrintMessageLibPropertyService
299 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample2")
301 val configProps = messageConsumerProperties.getConfig()
304 messageConsumerProperties.topic,
306 "Topic doesn't match the expected value"
309 messageConsumerProperties.type,
310 "kafka-scram-plain-text-auth",
311 "Authentication type doesn't match the expected value"
315 configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
316 "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
319 configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
320 "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
323 expectedConfig.forEach {
325 configProps.containsKey(it.key),
326 "Missing expected kafka config key : ${it.key}"
331 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
336 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
338 fun testKafkaIntegration() {
340 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
341 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
342 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
344 val channel = blueprintMessageConsumerService.subscribe(null)
346 channel.consumeEach {
347 log.info("Consumed Message : $it")
351 /** Send message with every 1 sec */
352 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
353 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
357 val headers: MutableMap<String, String> = hashMapOf()
358 headers["id"] = it.toString()
359 blueprintMessageProducerService.sendMessageNB(
361 message = "this is my message($it)",
367 blueprintMessageConsumerService.shutDown()