2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.Consumer
28 import org.apache.kafka.clients.consumer.ConsumerConfig
29 import org.apache.kafka.clients.consumer.ConsumerRecord
30 import org.apache.kafka.clients.consumer.ConsumerRecords
31 import org.apache.kafka.clients.consumer.MockConsumer
32 import org.apache.kafka.clients.consumer.OffsetResetStrategy
33 import org.apache.kafka.clients.producer.ProducerConfig
34 import org.apache.kafka.common.TopicPartition
35 import org.apache.kafka.common.config.SaslConfigs
36 import org.apache.kafka.common.config.SslConfigs
37 import org.apache.kafka.common.security.auth.SecurityProtocol
38 import org.apache.kafka.common.security.scram.ScramLoginModule
39 import org.apache.kafka.common.serialization.ByteArrayDeserializer
40 import org.apache.kafka.common.serialization.StringDeserializer
42 import org.junit.runner.RunWith
43 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.springframework.beans.factory.annotation.Autowired
50 import org.springframework.test.annotation.DirtiesContext
51 import org.springframework.test.context.ContextConfiguration
52 import org.springframework.test.context.TestPropertySource
53 import org.springframework.test.context.junit4.SpringRunner
54 import kotlin.test.assertEquals
55 import kotlin.test.assertNotNull
56 import kotlin.test.assertTrue
58 @RunWith(SpringRunner::class)
60 @ContextConfiguration(
61 classes = [BluePrintMessageLibConfiguration::class,
62 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
66 ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
67 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
68 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
69 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
70 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
71 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
72 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
73 "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
74 "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
75 "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
76 "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
77 "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
78 "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
80 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
81 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
82 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
83 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
84 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
85 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
86 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
87 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
88 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
89 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
92 open class BlueprintMessageConsumerServiceTest {
94 val log = logger(BlueprintMessageConsumerServiceTest::class)
97 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
100 fun testKafkaBasicAuthConsumerService() {
102 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
103 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
104 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
106 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
108 val topic = "default-topic"
109 val partitions: MutableList<TopicPartition> = arrayListOf()
110 val topicsCollection: MutableList<String> = arrayListOf()
111 partitions.add(TopicPartition(topic, 1))
112 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
113 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
115 val records: Long = 10
116 partitions.forEach { partition ->
117 partitionsBeginningMap[partition] = 0L
118 partitionsEndMap[partition] = records
119 topicsCollection.add(partition.topic())
121 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
122 mockKafkaConsumer.subscribe(topicsCollection)
123 mockKafkaConsumer.rebalance(partitions)
124 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
125 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
127 val record = ConsumerRecord<String, ByteArray>(
128 topic, 1, i.toLong(), "key_$i",
129 "I am message $i".toByteArray()
131 mockKafkaConsumer.addRecord(record)
134 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
135 val channel = spyBlueprintMessageConsumerService.subscribe(null)
137 channel.consumeEach {
138 assertTrue(it.startsWith("I am message"), "failed to get the actual message")
142 spyBlueprintMessageConsumerService.shutDown()
147 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
149 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
150 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
151 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
153 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
155 val topic = "default-topic"
156 val partitions: MutableList<TopicPartition> = arrayListOf()
157 val topicsCollection: MutableList<String> = arrayListOf()
158 partitions.add(TopicPartition(topic, 1))
159 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
160 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
162 val records: Long = 10
163 partitions.forEach { partition ->
164 partitionsBeginningMap[partition] = 0L
165 partitionsEndMap[partition] = records
166 topicsCollection.add(partition.topic())
168 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
169 mockKafkaConsumer.subscribe(topicsCollection)
170 mockKafkaConsumer.rebalance(partitions)
171 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
172 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
174 val record = ConsumerRecord<String, ByteArray>(
175 topic, 1, i.toLong(), "key_$i",
176 "I am message $i".toByteArray()
178 mockKafkaConsumer.addRecord(record)
181 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
182 /** Test Consumer Function implementation */
183 val consumerFunction = object : KafkaConsumerRecordsFunction {
184 override suspend fun invoke(
185 messageConsumerProperties: MessageConsumerProperties,
186 consumer: Consumer<*, *>,
187 consumerRecords: ConsumerRecords<*, *>
189 val count = consumerRecords.count()
190 log.trace("Received Message count($count)")
193 spyBlueprintMessageConsumerService.consume(consumerFunction)
195 spyBlueprintMessageConsumerService.shutDown()
200 fun testKafkaScramSslAuthConfig() {
202 val expectedConfig = mapOf<String, Any>(
203 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
204 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
205 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
206 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
207 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
208 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
209 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
210 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
211 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
212 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
213 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
214 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
215 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
216 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
217 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
218 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
219 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
220 "username=\"sample-user\" " +
221 "password=\"secretpassword\";"
224 val messageConsumerProperties = bluePrintMessageLibPropertyService
225 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
227 val configProps = messageConsumerProperties.getConfig()
229 assertEquals(messageConsumerProperties.topic,
231 "Topic doesn't match the expected value"
233 assertEquals(messageConsumerProperties.type,
234 "kafka-scram-ssl-auth",
235 "Authentication type doesn't match the expected value")
237 expectedConfig.forEach {
238 assertTrue(configProps.containsKey(it.key),
239 "Missing expected kafka config key : ${it.key}")
240 assertEquals(configProps[it.key],
242 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
247 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
249 fun testKafkaIntegration() {
251 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
252 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
253 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
255 val channel = blueprintMessageConsumerService.subscribe(null)
257 channel.consumeEach {
258 log.info("Consumed Message : $it")
262 /** Send message with every 1 sec */
263 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
264 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
268 val headers: MutableMap<String, String> = hashMapOf()
269 headers["id"] = it.toString()
270 blueprintMessageProducerService.sendMessageNB(
271 message = "this is my message($it)",
277 blueprintMessageConsumerService.shutDown()