Merge "Changing groupId for message-lib"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.consumer.*
27 import org.apache.kafka.common.TopicPartition
28 import org.junit.Test
29 import org.junit.runner.RunWith
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
34 import org.onap.ccsdk.cds.controllerblueprints.core.logger
35 import org.springframework.beans.factory.annotation.Autowired
36 import org.springframework.test.annotation.DirtiesContext
37 import org.springframework.test.context.ContextConfiguration
38 import org.springframework.test.context.TestPropertySource
39 import org.springframework.test.context.junit4.SpringRunner
40 import kotlin.test.assertNotNull
41 import kotlin.test.assertTrue
42
43
44 @RunWith(SpringRunner::class)
45 @DirtiesContext
46 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
47     BlueprintPropertyConfiguration::class, BluePrintProperties::class])
48 @TestPropertySource(properties =
49 ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
50     "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
51     "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
52     "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
53     "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
54     "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
55     "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
56
57     "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
58     "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
59     "blueprintsprocessor.messageproducer.sample.topic=default-topic",
60     "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
61 ])
62 open class BlueprintMessageConsumerServiceTest {
63     val log = logger(BlueprintMessageConsumerServiceTest::class)
64
65     @Autowired
66     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
67
68     @Test
69     fun testKafkaBasicAuthConsumerService() {
70         runBlocking {
71             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
72                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
73             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
74
75             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
76
77             val topic = "default-topic"
78             val partitions: MutableList<TopicPartition> = arrayListOf()
79             val topicsCollection: MutableList<String> = arrayListOf()
80             partitions.add(TopicPartition(topic, 1))
81             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
82             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
83
84             val records: Long = 10
85             partitions.forEach { partition ->
86                 partitionsBeginningMap[partition] = 0L
87                 partitionsEndMap[partition] = records
88                 topicsCollection.add(partition.topic())
89             }
90             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
91             mockKafkaConsumer.subscribe(topicsCollection)
92             mockKafkaConsumer.rebalance(partitions)
93             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
94             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
95             for (i in 1..10) {
96                 val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
97                         "I am message $i".toByteArray())
98                 mockKafkaConsumer.addRecord(record)
99             }
100
101             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
102             val channel = spyBlueprintMessageConsumerService.subscribe(null)
103             launch {
104                 channel.consumeEach {
105                     assertTrue(it.startsWith("I am message"), "failed to get the actual message")
106                 }
107             }
108             delay(10)
109             spyBlueprintMessageConsumerService.shutDown()
110         }
111     }
112
113     @Test
114     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
115         runBlocking {
116             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
117                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
118             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
119
120             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
121
122             val topic = "default-topic"
123             val partitions: MutableList<TopicPartition> = arrayListOf()
124             val topicsCollection: MutableList<String> = arrayListOf()
125             partitions.add(TopicPartition(topic, 1))
126             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
127             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
128
129             val records: Long = 10
130             partitions.forEach { partition ->
131                 partitionsBeginningMap[partition] = 0L
132                 partitionsEndMap[partition] = records
133                 topicsCollection.add(partition.topic())
134             }
135             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
136             mockKafkaConsumer.subscribe(topicsCollection)
137             mockKafkaConsumer.rebalance(partitions)
138             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
139             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
140             for (i in 1..10) {
141                 val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
142                         "I am message $i".toByteArray())
143                 mockKafkaConsumer.addRecord(record)
144             }
145
146             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
147             /** Test Consumer Function implementation */
148             val consumerFunction = object : KafkaConsumerRecordsFunction {
149                 override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
150                                             consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
151                     val count = consumerRecords.count()
152                     log.trace("Received Message count($count)")
153                 }
154             }
155             spyBlueprintMessageConsumerService.consume(consumerFunction)
156             delay(10)
157             spyBlueprintMessageConsumerService.shutDown()
158         }
159     }
160
161     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
162     //@Test
163     fun testKafkaIntegration() {
164         runBlocking {
165             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
166                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
167             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
168
169             val channel = blueprintMessageConsumerService.subscribe(null)
170             launch {
171                 channel.consumeEach {
172                     log.info("Consumed Message : $it")
173                 }
174             }
175
176             /** Send message with every 1 sec */
177             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
178                     .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
179             launch {
180                 repeat(5) {
181                     delay(100)
182                     val headers: MutableMap<String, String> = hashMapOf()
183                     headers["id"] = it.toString()
184                     blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
185                             headers = headers)
186                 }
187             }
188             delay(5000)
189             blueprintMessageConsumerService.shutDown()
190         }
191     }
192 }