2b84eaa78c66dac1cd345dd0fb265b5f052580e2
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
18
19 import io.mockk.every
20 import io.mockk.spyk
21 import kotlinx.coroutines.channels.consumeEach
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.apache.kafka.clients.consumer.ConsumerRecord
26 import org.apache.kafka.clients.consumer.MockConsumer
27 import org.apache.kafka.clients.consumer.OffsetResetStrategy
28 import org.apache.kafka.common.TopicPartition
29 import org.junit.Test
30 import org.junit.runner.RunWith
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
34 import org.onap.ccsdk.cds.controllerblueprints.core.logger
35 import org.springframework.beans.factory.annotation.Autowired
36 import org.springframework.test.annotation.DirtiesContext
37 import org.springframework.test.context.ContextConfiguration
38 import org.springframework.test.context.TestPropertySource
39 import org.springframework.test.context.junit4.SpringRunner
40 import kotlin.test.assertNotNull
41 import kotlin.test.assertTrue
42
43
44 @RunWith(SpringRunner::class)
45 @DirtiesContext
46 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
47     BlueprintPropertyConfiguration::class, BluePrintProperties::class])
48 @TestPropertySource(properties =
49 ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
50     "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
51     "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
52     "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
53     "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
54     "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
55
56     "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
57     "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
58     "blueprintsprocessor.messageproducer.sample.topic=default-topic",
59     "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
60 ])
61 open class BlueprintMessageConsumerServiceTest {
62     val log = logger(BlueprintMessageConsumerServiceTest::class)
63
64     @Autowired
65     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
66
67     @Test
68     fun testKafkaBasicAuthConsumerService() {
69         runBlocking {
70             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
71                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
72             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
73
74             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
75
76             val topic = "default-topic"
77             val partitions: MutableList<TopicPartition> = arrayListOf()
78             val topicsCollection: MutableList<String> = arrayListOf()
79             partitions.add(TopicPartition(topic, 1))
80             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
81             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
82
83             val records: Long = 10
84             partitions.forEach { partition ->
85                 partitionsBeginningMap[partition] = 0L
86                 partitionsEndMap[partition] = records
87                 topicsCollection.add(partition.topic())
88             }
89             val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
90             mockKafkaConsumer.subscribe(topicsCollection)
91             mockKafkaConsumer.rebalance(partitions)
92             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
93             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
94             for (i in 1..10) {
95                 val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
96                         "I am message $i")
97                 mockKafkaConsumer.addRecord(record)
98             }
99
100             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
101             val channel = spyBlueprintMessageConsumerService.subscribe(null)
102             launch {
103                 channel.consumeEach {
104                     assertTrue(it.startsWith("I am message"), "failed to get the actual message")
105                 }
106             }
107             delay(10)
108             spyBlueprintMessageConsumerService.shutDown()
109         }
110     }
111
112     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
113     //@Test
114     fun testKafkaIntegration() {
115         runBlocking {
116             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
117                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
118             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
119
120             val channel = blueprintMessageConsumerService.subscribe(null)
121             launch {
122                 channel.consumeEach {
123                     log.info("Consumed Message : $it")
124                 }
125             }
126
127             /** Send message with every 1 sec */
128             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
129                     .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
130             launch {
131                 repeat(5) {
132                     delay(1000)
133                     blueprintMessageProducerService.sendMessage("this is my message($it)")
134                 }
135             }
136             delay(10000)
137             blueprintMessageConsumerService.shutDown()
138         }
139     }
140 }