18b86b8d8fe18c3d84d665b23bbebad8fae16072
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
18
19 import io.mockk.every
20 import io.mockk.spyk
21 import kotlinx.coroutines.channels.consumeEach
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.runBlocking
24 import org.apache.kafka.clients.consumer.ConsumerRecord
25 import org.apache.kafka.clients.consumer.MockConsumer
26 import org.apache.kafka.clients.consumer.OffsetResetStrategy
27 import org.apache.kafka.common.TopicPartition
28 import org.junit.Test
29 import org.junit.runner.RunWith
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
33 import org.springframework.beans.factory.annotation.Autowired
34 import org.springframework.test.annotation.DirtiesContext
35 import org.springframework.test.context.ContextConfiguration
36 import org.springframework.test.context.TestPropertySource
37 import org.springframework.test.context.junit4.SpringRunner
38 import kotlin.test.assertNotNull
39
40
41 @RunWith(SpringRunner::class)
42 @DirtiesContext
43 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
44     BlueprintPropertyConfiguration::class, BluePrintProperties::class])
45 @TestPropertySource(properties =
46 ["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
47     "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
48     "blueprintsprocessor.messageclient.sample.groupId=sample-group",
49     "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic"
50 ])
51 open class BlueprintMessageConsumerServiceTest {
52
53     @Autowired
54     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
55
56     @Test
57     fun testKafkaBasicAuthConsumerService() {
58         runBlocking {
59             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
60                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
61             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
62
63             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
64
65             val topic = "default-topic"
66             val partitions: MutableList<TopicPartition> = arrayListOf()
67             val topicsCollection: MutableList<String> = arrayListOf()
68             partitions.add(TopicPartition(topic, 1))
69             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
70             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
71
72             val records: Long = 10
73             partitions.forEach { partition ->
74                 partitionsBeginningMap[partition] = 0L
75                 partitionsEndMap[partition] = records
76                 topicsCollection.add(partition.topic())
77             }
78             val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
79             mockKafkaConsumer.subscribe(topicsCollection)
80             mockKafkaConsumer.rebalance(partitions)
81             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
82             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
83             for (i in 1..10) {
84                 val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
85                         "I am message $i")
86                 mockKafkaConsumer.addRecord(record)
87             }
88
89             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
90             val channel = spyBlueprintMessageConsumerService.subscribe(null)
91             launch {
92                 channel.consumeEach {
93                     println("Received message : $it")
94                 }
95             }
96             //delay(100)
97             spyBlueprintMessageConsumerService.shutDown()
98         }
99     }
100 }