2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
21 import kotlinx.coroutines.channels.consumeEach
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.apache.kafka.clients.consumer.ConsumerRecord
26 import org.apache.kafka.clients.consumer.MockConsumer
27 import org.apache.kafka.clients.consumer.OffsetResetStrategy
28 import org.apache.kafka.common.TopicPartition
30 import org.junit.runner.RunWith
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
34 import org.onap.ccsdk.cds.controllerblueprints.core.logger
35 import org.springframework.beans.factory.annotation.Autowired
36 import org.springframework.test.annotation.DirtiesContext
37 import org.springframework.test.context.ContextConfiguration
38 import org.springframework.test.context.TestPropertySource
39 import org.springframework.test.context.junit4.SpringRunner
40 import kotlin.test.assertNotNull
41 import kotlin.test.assertTrue
44 @RunWith(SpringRunner::class)
46 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
47 BlueprintPropertyConfiguration::class, BluePrintProperties::class])
48 @TestPropertySource(properties =
49 ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
50 "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
51 "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
52 "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
53 "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
54 "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
55 "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
57 "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
58 "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
59 "blueprintsprocessor.messageproducer.sample.topic=default-topic",
60 "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
62 open class BlueprintMessageConsumerServiceTest {
63 val log = logger(BlueprintMessageConsumerServiceTest::class)
66 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
69 fun testKafkaBasicAuthConsumerService() {
71 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
72 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
73 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
75 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
77 val topic = "default-topic"
78 val partitions: MutableList<TopicPartition> = arrayListOf()
79 val topicsCollection: MutableList<String> = arrayListOf()
80 partitions.add(TopicPartition(topic, 1))
81 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
82 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
84 val records: Long = 10
85 partitions.forEach { partition ->
86 partitionsBeginningMap[partition] = 0L
87 partitionsEndMap[partition] = records
88 topicsCollection.add(partition.topic())
90 val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
91 mockKafkaConsumer.subscribe(topicsCollection)
92 mockKafkaConsumer.rebalance(partitions)
93 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
94 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
96 val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
98 mockKafkaConsumer.addRecord(record)
101 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
102 val channel = spyBlueprintMessageConsumerService.subscribe(null)
104 channel.consumeEach {
105 assertTrue(it.startsWith("I am message"), "failed to get the actual message")
109 spyBlueprintMessageConsumerService.shutDown()
113 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
115 fun testKafkaIntegration() {
117 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
118 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
119 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
121 val channel = blueprintMessageConsumerService.subscribe(null)
123 channel.consumeEach {
124 log.info("Consumed Message : $it")
128 /** Send message with every 1 sec */
129 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
130 .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
134 blueprintMessageProducerService.sendMessage("this is my message($it)")
138 blueprintMessageConsumerService.shutDown()