2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
21 import kotlinx.coroutines.channels.consumeEach
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.runBlocking
24 import org.apache.kafka.clients.consumer.ConsumerRecord
25 import org.apache.kafka.clients.consumer.MockConsumer
26 import org.apache.kafka.clients.consumer.OffsetResetStrategy
27 import org.apache.kafka.common.TopicPartition
29 import org.junit.runner.RunWith
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
33 import org.springframework.beans.factory.annotation.Autowired
34 import org.springframework.test.annotation.DirtiesContext
35 import org.springframework.test.context.ContextConfiguration
36 import org.springframework.test.context.TestPropertySource
37 import org.springframework.test.context.junit4.SpringRunner
38 import kotlin.test.assertNotNull
41 @RunWith(SpringRunner::class)
43 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
44 BlueprintPropertyConfiguration::class, BluePrintProperties::class])
45 @TestPropertySource(properties =
46 ["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
47 "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
48 "blueprintsprocessor.messageclient.sample.groupId=sample-group",
49 "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic"
51 open class BlueprintMessageConsumerServiceTest {
54 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
57 fun testKafkaBasicAuthConsumerService() {
59 val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
60 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
61 assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
63 val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
65 val topic = "default-topic"
66 val partitions: MutableList<TopicPartition> = arrayListOf()
67 val topicsCollection: MutableList<String> = arrayListOf()
68 partitions.add(TopicPartition(topic, 1))
69 val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
70 val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
72 val records: Long = 10
73 partitions.forEach { partition ->
74 partitionsBeginningMap[partition] = 0L
75 partitionsEndMap[partition] = records
76 topicsCollection.add(partition.topic())
78 val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
79 mockKafkaConsumer.subscribe(topicsCollection)
80 mockKafkaConsumer.rebalance(partitions)
81 mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
82 mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
84 val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
86 mockKafkaConsumer.addRecord(record)
89 every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
90 val channel = spyBlueprintMessageConsumerService.subscribe(null)
93 println("Received message : $it")
97 spyBlueprintMessageConsumerService.shutDown()