2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.Consumer
28 import org.apache.kafka.clients.consumer.ConsumerConfig
29 import org.apache.kafka.clients.consumer.ConsumerRecord
30 import org.apache.kafka.clients.consumer.ConsumerRecords
31 import org.apache.kafka.clients.consumer.MockConsumer
32 import org.apache.kafka.clients.consumer.OffsetResetStrategy
33 import org.apache.kafka.clients.producer.ProducerConfig
34 import org.apache.kafka.common.TopicPartition
35 import org.apache.kafka.common.config.SaslConfigs
36 import org.apache.kafka.common.config.SslConfigs
37 import org.apache.kafka.common.security.auth.SecurityProtocol
38 import org.apache.kafka.common.security.scram.ScramLoginModule
39 import org.apache.kafka.common.serialization.ByteArrayDeserializer
40 import org.apache.kafka.common.serialization.StringDeserializer
42 import org.junit.runner.RunWith
43 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.springframework.beans.factory.annotation.Autowired
50 import org.springframework.test.annotation.DirtiesContext
51 import org.springframework.test.context.ContextConfiguration
52 import org.springframework.test.context.TestPropertySource
53 import org.springframework.test.context.junit4.SpringRunner
54 import java.nio.charset.Charset
55 import kotlin.test.assertEquals
56 import kotlin.test.assertNotNull
57 import kotlin.test.assertTrue
59 @RunWith(SpringRunner::class)
61 @ContextConfiguration(
62 classes = [BluePrintMessageLibConfiguration::class,
63 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
67 ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
68 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
69 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
70 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
71 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
72 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
73 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
74 "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
75 "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
76 "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
77 "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
78 "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
79 "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
81 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
82 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
83 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
84 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
85 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
86 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
87 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
88 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
89 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
90 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
93 open class BlueprintMessageConsumerServiceTest {
95 val log = logger(BlueprintMessageConsumerServiceTest::class)
98 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
101 fun testKafkaBasicAuthConsumerService() {
103 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
104 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
105 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
107 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
109 val topic = "default-topic"
110 val partitions: MutableList<TopicPartition> = arrayListOf()
111 val topicsCollection: MutableList<String> = arrayListOf()
112 partitions.add(TopicPartition(topic, 1))
113 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
114 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
116 val records: Long = 10
117 partitions.forEach { partition ->
118 partitionsBeginningMap[partition] = 0L
119 partitionsEndMap[partition] = records
120 topicsCollection.add(partition.topic())
122 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
123 mockKafkaConsumer.subscribe(topicsCollection)
124 mockKafkaConsumer.rebalance(partitions)
125 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
126 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
128 val record = ConsumerRecord<String, ByteArray>(
129 topic, 1, i.toLong(), "key_$i",
130 "I am message $i".toByteArray()
132 mockKafkaConsumer.addRecord(record)
135 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
136 val channel = spyBlueprintMessageConsumerService.subscribe(null)
139 channel.consumeEach {
142 val value = String(it.value(), Charset.defaultCharset())
143 assertTrue(value.startsWith("I am message"), "failed to get the actual message")
144 assertEquals("key_$i", key)
148 spyBlueprintMessageConsumerService.shutDown()
153 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
155 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
156 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
157 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
159 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
161 val topic = "default-topic"
162 val partitions: MutableList<TopicPartition> = arrayListOf()
163 val topicsCollection: MutableList<String> = arrayListOf()
164 partitions.add(TopicPartition(topic, 1))
165 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
166 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
168 val records: Long = 10
169 partitions.forEach { partition ->
170 partitionsBeginningMap[partition] = 0L
171 partitionsEndMap[partition] = records
172 topicsCollection.add(partition.topic())
174 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
175 mockKafkaConsumer.subscribe(topicsCollection)
176 mockKafkaConsumer.rebalance(partitions)
177 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
178 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
180 val record = ConsumerRecord<String, ByteArray>(
181 topic, 1, i.toLong(), "key_$i",
182 "I am message $i".toByteArray()
184 mockKafkaConsumer.addRecord(record)
187 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
188 /** Test Consumer Function implementation */
189 val consumerFunction = object : KafkaConsumerRecordsFunction {
190 override suspend fun invoke(
191 messageConsumerProperties: MessageConsumerProperties,
192 consumer: Consumer<*, *>,
193 consumerRecords: ConsumerRecords<*, *>
195 val count = consumerRecords.count()
196 log.trace("Received Message count($count)")
199 spyBlueprintMessageConsumerService.consume(consumerFunction)
201 spyBlueprintMessageConsumerService.shutDown()
206 fun testKafkaScramSslAuthConfig() {
208 val expectedConfig = mapOf<String, Any>(
209 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
210 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
211 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
212 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
213 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
214 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
215 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
216 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
217 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
218 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
219 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
220 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
221 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
222 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
223 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
224 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
225 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
226 "username=\"sample-user\" " +
227 "password=\"secretpassword\";"
230 val messageConsumerProperties = bluePrintMessageLibPropertyService
231 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
233 val configProps = messageConsumerProperties.getConfig()
235 assertEquals(messageConsumerProperties.topic,
237 "Topic doesn't match the expected value"
239 assertEquals(messageConsumerProperties.type,
240 "kafka-scram-ssl-auth",
241 "Authentication type doesn't match the expected value")
243 expectedConfig.forEach {
244 assertTrue(configProps.containsKey(it.key),
245 "Missing expected kafka config key : ${it.key}")
246 assertEquals(configProps[it.key],
248 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
253 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
255 fun testKafkaIntegration() {
257 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
258 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
259 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
261 val channel = blueprintMessageConsumerService.subscribe(null)
263 channel.consumeEach {
264 log.info("Consumed Message : $it")
268 /** Send message with every 1 sec */
269 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
270 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
274 val headers: MutableMap<String, String> = hashMapOf()
275 headers["id"] = it.toString()
276 blueprintMessageProducerService.sendMessageNB(
278 message = "this is my message($it)",
284 blueprintMessageConsumerService.shutDown()