2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.Consumer
28 import org.apache.kafka.clients.consumer.ConsumerConfig
29 import org.apache.kafka.clients.consumer.ConsumerRecord
30 import org.apache.kafka.clients.consumer.ConsumerRecords
31 import org.apache.kafka.clients.consumer.MockConsumer
32 import org.apache.kafka.clients.consumer.OffsetResetStrategy
33 import org.apache.kafka.clients.producer.ProducerConfig
34 import org.apache.kafka.common.TopicPartition
35 import org.apache.kafka.common.config.SaslConfigs
36 import org.apache.kafka.common.config.SslConfigs
37 import org.apache.kafka.common.security.auth.SecurityProtocol
38 import org.apache.kafka.common.security.scram.ScramLoginModule
39 import org.apache.kafka.common.serialization.ByteArrayDeserializer
40 import org.apache.kafka.common.serialization.StringDeserializer
42 import org.junit.runner.RunWith
43 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.springframework.beans.factory.annotation.Autowired
50 import org.springframework.test.annotation.DirtiesContext
51 import org.springframework.test.context.ContextConfiguration
52 import org.springframework.test.context.TestPropertySource
53 import org.springframework.test.context.junit4.SpringRunner
54 import java.nio.charset.Charset
55 import kotlin.test.assertEquals
56 import kotlin.test.assertNotNull
57 import kotlin.test.assertTrue
59 @RunWith(SpringRunner::class)
61 @ContextConfiguration(
63 BluePrintMessageLibConfiguration::class,
64 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
70 "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
71 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
72 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
73 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
74 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
75 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
76 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
77 "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
78 "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
79 "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
80 "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
81 "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
82 "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
84 "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
85 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
86 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
87 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
88 "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
89 "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
90 "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
91 "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
92 "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
93 "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
96 open class BlueprintMessageConsumerServiceTest {
98 val log = logger(BlueprintMessageConsumerServiceTest::class)
101 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
104 fun testKafkaBasicAuthConsumerService() {
106 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
107 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
108 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
110 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
112 val topic = "default-topic"
113 val partitions: MutableList<TopicPartition> = arrayListOf()
114 val topicsCollection: MutableList<String> = arrayListOf()
115 partitions.add(TopicPartition(topic, 1))
116 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
117 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
119 val records: Long = 10
120 partitions.forEach { partition ->
121 partitionsBeginningMap[partition] = 0L
122 partitionsEndMap[partition] = records
123 topicsCollection.add(partition.topic())
125 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
126 mockKafkaConsumer.subscribe(topicsCollection)
127 mockKafkaConsumer.rebalance(partitions)
128 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
129 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
131 val record = ConsumerRecord<String, ByteArray>(
132 topic, 1, i.toLong(), "key_$i",
133 "I am message $i".toByteArray()
135 mockKafkaConsumer.addRecord(record)
138 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
139 val channel = spyBlueprintMessageConsumerService.subscribe(null)
142 channel.consumeEach {
145 val value = String(it.value(), Charset.defaultCharset())
146 assertTrue(value.startsWith("I am message"), "failed to get the actual message")
147 assertEquals("key_$i", key)
151 spyBlueprintMessageConsumerService.shutDown()
156 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
158 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
159 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
160 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
162 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
164 val topic = "default-topic"
165 val partitions: MutableList<TopicPartition> = arrayListOf()
166 val topicsCollection: MutableList<String> = arrayListOf()
167 partitions.add(TopicPartition(topic, 1))
168 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
169 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
171 val records: Long = 10
172 partitions.forEach { partition ->
173 partitionsBeginningMap[partition] = 0L
174 partitionsEndMap[partition] = records
175 topicsCollection.add(partition.topic())
177 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
178 mockKafkaConsumer.subscribe(topicsCollection)
179 mockKafkaConsumer.rebalance(partitions)
180 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
181 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
183 val record = ConsumerRecord<String, ByteArray>(
184 topic, 1, i.toLong(), "key_$i",
185 "I am message $i".toByteArray()
187 mockKafkaConsumer.addRecord(record)
190 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
191 /** Test Consumer Function implementation */
192 val consumerFunction = object : KafkaConsumerRecordsFunction {
193 override suspend fun invoke(
194 messageConsumerProperties: MessageConsumerProperties,
195 consumer: Consumer<*, *>,
196 consumerRecords: ConsumerRecords<*, *>
198 val count = consumerRecords.count()
199 log.trace("Received Message count($count)")
202 spyBlueprintMessageConsumerService.consume(consumerFunction)
204 spyBlueprintMessageConsumerService.shutDown()
209 fun testKafkaScramSslAuthConfig() {
211 val expectedConfig = mapOf<String, Any>(
212 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
213 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
214 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
215 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
216 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
217 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
218 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
219 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
220 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
221 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
222 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
223 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
224 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
225 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
226 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
227 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
228 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
229 "username=\"sample-user\" " +
230 "password=\"secretpassword\";"
233 val messageConsumerProperties = bluePrintMessageLibPropertyService
234 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
236 val configProps = messageConsumerProperties.getConfig()
239 messageConsumerProperties.topic,
241 "Topic doesn't match the expected value"
244 messageConsumerProperties.type,
245 "kafka-scram-ssl-auth",
246 "Authentication type doesn't match the expected value"
249 expectedConfig.forEach {
251 configProps.containsKey(it.key),
252 "Missing expected kafka config key : ${it.key}"
257 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
262 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
264 fun testKafkaIntegration() {
266 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
267 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
268 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
270 val channel = blueprintMessageConsumerService.subscribe(null)
272 channel.consumeEach {
273 log.info("Consumed Message : $it")
277 /** Send message with every 1 sec */
278 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
279 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
283 val headers: MutableMap<String, String> = hashMapOf()
284 headers["id"] = it.toString()
285 blueprintMessageProducerService.sendMessageNB(
287 message = "this is my message($it)",
293 blueprintMessageConsumerService.shutDown()