2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import io.micrometer.core.instrument.MeterRegistry
23 import kotlinx.coroutines.channels.consumeEach
24 import kotlinx.coroutines.delay
25 import kotlinx.coroutines.launch
26 import kotlinx.coroutines.runBlocking
27 import org.apache.kafka.clients.CommonClientConfigs
28 import org.apache.kafka.clients.consumer.Consumer
29 import org.apache.kafka.clients.consumer.ConsumerConfig
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.apache.kafka.clients.consumer.ConsumerRecords
32 import org.apache.kafka.clients.consumer.MockConsumer
33 import org.apache.kafka.clients.consumer.OffsetResetStrategy
34 import org.apache.kafka.clients.producer.ProducerConfig
35 import org.apache.kafka.common.TopicPartition
36 import org.apache.kafka.common.config.SaslConfigs
37 import org.apache.kafka.common.config.SslConfigs
38 import org.apache.kafka.common.security.auth.SecurityProtocol
39 import org.apache.kafka.common.security.scram.ScramLoginModule
40 import org.apache.kafka.common.serialization.ByteArrayDeserializer
41 import org.apache.kafka.common.serialization.StringDeserializer
43 import org.junit.runner.RunWith
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
45 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
48 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
49 import org.onap.ccsdk.cds.controllerblueprints.core.logger
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.mock.mockito.MockBean
52 import org.springframework.test.annotation.DirtiesContext
53 import org.springframework.test.context.ContextConfiguration
54 import org.springframework.test.context.TestPropertySource
55 import org.springframework.test.context.junit4.SpringRunner
56 import java.nio.charset.Charset
57 import kotlin.test.assertEquals
58 import kotlin.test.assertNotNull
59 import kotlin.test.assertTrue
61 @RunWith(SpringRunner::class)
63 @ContextConfiguration(
65 BlueprintMessageLibConfiguration::class,
66 BlueprintPropertyConfiguration::class, BlueprintPropertiesService::class
72 "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
73 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
74 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
75 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
76 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
77 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
78 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
79 "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
80 "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
81 "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
82 "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
83 "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
84 "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
86 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
87 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
88 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
89 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
90 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
91 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
92 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
93 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
94 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
95 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
98 open class BlueprintMessageConsumerServiceTest {
100 val log = logger(BlueprintMessageConsumerServiceTest::class)
103 lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
106 lateinit var meterRegistry: MeterRegistry
109 fun testKafkaBasicAuthConsumerService() {
111 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
112 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
113 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
115 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
117 val topic = "default-topic"
118 val partitions: MutableList<TopicPartition> = arrayListOf()
119 val topicsCollection: MutableList<String> = arrayListOf()
120 partitions.add(TopicPartition(topic, 1))
121 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
122 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
124 val records: Long = 10
125 partitions.forEach { partition ->
126 partitionsBeginningMap[partition] = 0L
127 partitionsEndMap[partition] = records
128 topicsCollection.add(partition.topic())
130 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
131 mockKafkaConsumer.subscribe(topicsCollection)
132 mockKafkaConsumer.rebalance(partitions)
133 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
134 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
136 val record = ConsumerRecord<String, ByteArray>(
137 topic, 1, i.toLong(), "key_$i",
138 "I am message $i".toByteArray()
140 mockKafkaConsumer.addRecord(record)
143 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
144 val channel = spyBlueprintMessageConsumerService.subscribe(null)
147 channel.consumeEach {
150 val value = String(it.value(), Charset.defaultCharset())
151 assertTrue(value.startsWith("I am message"), "failed to get the actual message")
152 assertEquals("key_$i", key)
156 spyBlueprintMessageConsumerService.shutDown()
161 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
163 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
164 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
165 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
167 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
169 val topic = "default-topic"
170 val partitions: MutableList<TopicPartition> = arrayListOf()
171 val topicsCollection: MutableList<String> = arrayListOf()
172 partitions.add(TopicPartition(topic, 1))
173 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
174 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
176 val records: Long = 10
177 partitions.forEach { partition ->
178 partitionsBeginningMap[partition] = 0L
179 partitionsEndMap[partition] = records
180 topicsCollection.add(partition.topic())
182 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
183 mockKafkaConsumer.subscribe(topicsCollection)
184 mockKafkaConsumer.rebalance(partitions)
185 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
186 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
188 val record = ConsumerRecord<String, ByteArray>(
189 topic, 1, i.toLong(), "key_$i",
190 "I am message $i".toByteArray()
192 mockKafkaConsumer.addRecord(record)
195 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
196 /** Test Consumer Function implementation */
197 val consumerFunction = object : KafkaConsumerRecordsFunction {
198 override suspend fun invoke(
199 messageConsumerProperties: MessageConsumerProperties,
200 consumer: Consumer<*, *>,
201 consumerRecords: ConsumerRecords<*, *>
203 val count = consumerRecords.count()
204 log.trace("Received Message count($count)")
207 spyBlueprintMessageConsumerService.consume(consumerFunction)
209 spyBlueprintMessageConsumerService.shutDown()
214 fun testKafkaScramSslAuthConfig() {
216 val expectedConfig = mapOf<String, Any>(
217 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
218 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
219 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
220 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
221 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
222 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
223 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
224 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
225 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
226 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
227 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
228 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
229 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
230 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
231 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
232 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
233 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
234 "username=\"sample-user\" " +
235 "password=\"secretpassword\";"
238 val messageConsumerProperties = bluePrintMessageLibPropertyService
239 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
241 val configProps = messageConsumerProperties.getConfig()
244 messageConsumerProperties.topic,
246 "Topic doesn't match the expected value"
249 messageConsumerProperties.type,
250 "kafka-scram-ssl-auth",
251 "Authentication type doesn't match the expected value"
254 expectedConfig.forEach {
256 configProps.containsKey(it.key),
257 "Missing expected kafka config key : ${it.key}"
262 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
267 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
269 fun testKafkaIntegration() {
271 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
272 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
273 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
275 val channel = blueprintMessageConsumerService.subscribe(null)
277 channel.consumeEach {
278 log.info("Consumed Message : $it")
282 /** Send message with every 1 sec */
283 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
284 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
288 val headers: MutableMap<String, String> = hashMapOf()
289 headers["id"] = it.toString()
290 blueprintMessageProducerService.sendMessageNB(
292 message = "this is my message($it)",
298 blueprintMessageConsumerService.shutDown()