2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.consumer.*
27 import org.apache.kafka.common.TopicPartition
29 import org.junit.runner.RunWith
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
34 import org.onap.ccsdk.cds.controllerblueprints.core.logger
35 import org.springframework.beans.factory.annotation.Autowired
36 import org.springframework.test.annotation.DirtiesContext
37 import org.springframework.test.context.ContextConfiguration
38 import org.springframework.test.context.TestPropertySource
39 import org.springframework.test.context.junit4.SpringRunner
40 import kotlin.test.assertNotNull
41 import kotlin.test.assertTrue
44 @RunWith(SpringRunner::class)
46 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
47 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
48 @TestPropertySource(properties =
49 ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
50 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
51 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
52 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
53 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
54 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
55 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
57 "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
58 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
59 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
60 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
62 open class BlueprintMessageConsumerServiceTest {
63 val log = logger(BlueprintMessageConsumerServiceTest::class)
66 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
69 fun testKafkaBasicAuthConsumerService() {
71 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
72 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
73 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
75 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
77 val topic = "default-topic"
78 val partitions: MutableList<TopicPartition> = arrayListOf()
79 val topicsCollection: MutableList<String> = arrayListOf()
80 partitions.add(TopicPartition(topic, 1))
81 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
82 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
84 val records: Long = 10
85 partitions.forEach { partition ->
86 partitionsBeginningMap[partition] = 0L
87 partitionsEndMap[partition] = records
88 topicsCollection.add(partition.topic())
90 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
91 mockKafkaConsumer.subscribe(topicsCollection)
92 mockKafkaConsumer.rebalance(partitions)
93 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
94 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
96 val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
97 "I am message $i".toByteArray())
98 mockKafkaConsumer.addRecord(record)
101 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
102 val channel = spyBlueprintMessageConsumerService.subscribe(null)
104 channel.consumeEach {
105 assertTrue(it.startsWith("I am message"), "failed to get the actual message")
109 spyBlueprintMessageConsumerService.shutDown()
114 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
116 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
117 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
118 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
120 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
122 val topic = "default-topic"
123 val partitions: MutableList<TopicPartition> = arrayListOf()
124 val topicsCollection: MutableList<String> = arrayListOf()
125 partitions.add(TopicPartition(topic, 1))
126 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
127 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
129 val records: Long = 10
130 partitions.forEach { partition ->
131 partitionsBeginningMap[partition] = 0L
132 partitionsEndMap[partition] = records
133 topicsCollection.add(partition.topic())
135 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
136 mockKafkaConsumer.subscribe(topicsCollection)
137 mockKafkaConsumer.rebalance(partitions)
138 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
139 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
141 val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
142 "I am message $i".toByteArray())
143 mockKafkaConsumer.addRecord(record)
146 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
147 /** Test Consumer Function implementation */
148 val consumerFunction = object : KafkaConsumerRecordsFunction {
149 override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
150 consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
151 val count = consumerRecords.count()
152 log.trace("Received Message count($count)")
155 spyBlueprintMessageConsumerService.consume(consumerFunction)
157 spyBlueprintMessageConsumerService.shutDown()
161 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
163 fun testKafkaIntegration() {
165 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
166 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
167 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
169 val channel = blueprintMessageConsumerService.subscribe(null)
171 channel.consumeEach {
172 log.info("Consumed Message : $it")
176 /** Send message with every 1 sec */
177 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
178 .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
182 val headers: MutableMap<String, String> = hashMapOf()
183 headers["id"] = it.toString()
184 blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
189 blueprintMessageConsumerService.shutDown()