2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.consumer.Consumer
27 import org.apache.kafka.clients.consumer.ConsumerRecord
28 import org.apache.kafka.clients.consumer.ConsumerRecords
29 import org.apache.kafka.clients.consumer.MockConsumer
30 import org.apache.kafka.clients.consumer.OffsetResetStrategy
31 import org.apache.kafka.common.TopicPartition
33 import org.junit.runner.RunWith
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
36 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
37 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
38 import org.onap.ccsdk.cds.controllerblueprints.core.logger
39 import org.springframework.beans.factory.annotation.Autowired
40 import org.springframework.test.annotation.DirtiesContext
41 import org.springframework.test.context.ContextConfiguration
42 import org.springframework.test.context.TestPropertySource
43 import org.springframework.test.context.junit4.SpringRunner
44 import kotlin.test.assertNotNull
45 import kotlin.test.assertTrue
47 @RunWith(SpringRunner::class)
49 @ContextConfiguration(
50 classes = [BluePrintMessageLibConfiguration::class,
51 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
55 ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
56 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
57 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
58 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
59 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
60 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
61 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
63 "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
64 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
65 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
66 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
69 open class BlueprintMessageConsumerServiceTest {
71 val log = logger(BlueprintMessageConsumerServiceTest::class)
74 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
77 fun testKafkaBasicAuthConsumerService() {
79 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
80 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
81 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
83 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
85 val topic = "default-topic"
86 val partitions: MutableList<TopicPartition> = arrayListOf()
87 val topicsCollection: MutableList<String> = arrayListOf()
88 partitions.add(TopicPartition(topic, 1))
89 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
90 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
92 val records: Long = 10
93 partitions.forEach { partition ->
94 partitionsBeginningMap[partition] = 0L
95 partitionsEndMap[partition] = records
96 topicsCollection.add(partition.topic())
98 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
99 mockKafkaConsumer.subscribe(topicsCollection)
100 mockKafkaConsumer.rebalance(partitions)
101 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
102 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
104 val record = ConsumerRecord<String, ByteArray>(
105 topic, 1, i.toLong(), "key_$i",
106 "I am message $i".toByteArray()
108 mockKafkaConsumer.addRecord(record)
111 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
112 val channel = spyBlueprintMessageConsumerService.subscribe(null)
114 channel.consumeEach {
115 assertTrue(it.startsWith("I am message"), "failed to get the actual message")
119 spyBlueprintMessageConsumerService.shutDown()
124 fun testKafkaBasicAuthConsumerWithDynamicFunction() {
126 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
127 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
128 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
130 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
132 val topic = "default-topic"
133 val partitions: MutableList<TopicPartition> = arrayListOf()
134 val topicsCollection: MutableList<String> = arrayListOf()
135 partitions.add(TopicPartition(topic, 1))
136 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
137 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
139 val records: Long = 10
140 partitions.forEach { partition ->
141 partitionsBeginningMap[partition] = 0L
142 partitionsEndMap[partition] = records
143 topicsCollection.add(partition.topic())
145 val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
146 mockKafkaConsumer.subscribe(topicsCollection)
147 mockKafkaConsumer.rebalance(partitions)
148 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
149 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
151 val record = ConsumerRecord<String, ByteArray>(
152 topic, 1, i.toLong(), "key_$i",
153 "I am message $i".toByteArray()
155 mockKafkaConsumer.addRecord(record)
158 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
159 /** Test Consumer Function implementation */
160 val consumerFunction = object : KafkaConsumerRecordsFunction {
161 override suspend fun invoke(
162 messageConsumerProperties: MessageConsumerProperties,
163 consumer: Consumer<*, *>,
164 consumerRecords: ConsumerRecords<*, *>
166 val count = consumerRecords.count()
167 log.trace("Received Message count($count)")
170 spyBlueprintMessageConsumerService.consume(consumerFunction)
172 spyBlueprintMessageConsumerService.shutDown()
176 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
178 fun testKafkaIntegration() {
180 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
181 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
182 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
184 val channel = blueprintMessageConsumerService.subscribe(null)
186 channel.consumeEach {
187 log.info("Consumed Message : $it")
191 /** Send message with every 1 sec */
192 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
193 .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
197 val headers: MutableMap<String, String> = hashMapOf()
198 headers["id"] = it.toString()
199 blueprintMessageProducerService.sendMessageNB(
200 message = "this is my message($it)",
206 blueprintMessageConsumerService.shutDown()