823ba7deeb733c6fb3031848a56d8d80fdb69d99
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.consumer.Consumer
27 import org.apache.kafka.clients.consumer.ConsumerRecord
28 import org.apache.kafka.clients.consumer.ConsumerRecords
29 import org.apache.kafka.clients.consumer.MockConsumer
30 import org.apache.kafka.clients.consumer.OffsetResetStrategy
31 import org.apache.kafka.common.TopicPartition
32 import org.junit.Test
33 import org.junit.runner.RunWith
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
36 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
37 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
38 import org.onap.ccsdk.cds.controllerblueprints.core.logger
39 import org.springframework.beans.factory.annotation.Autowired
40 import org.springframework.test.annotation.DirtiesContext
41 import org.springframework.test.context.ContextConfiguration
42 import org.springframework.test.context.TestPropertySource
43 import org.springframework.test.context.junit4.SpringRunner
44 import kotlin.test.assertNotNull
45 import kotlin.test.assertTrue
46
47 @RunWith(SpringRunner::class)
48 @DirtiesContext
49 @ContextConfiguration(
50     classes = [BluePrintMessageLibConfiguration::class,
51         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
52 )
53 @TestPropertySource(
54     properties =
55     ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
56         "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
57         "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
58         "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
59         "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
60         "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
61         "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
62
63         "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
64         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
65         "blueprintsprocessor.messageproducer.sample.topic=default-topic",
66         "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
67     ]
68 )
69 open class BlueprintMessageConsumerServiceTest {
70
71     val log = logger(BlueprintMessageConsumerServiceTest::class)
72
73     @Autowired
74     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
75
76     @Test
77     fun testKafkaBasicAuthConsumerService() {
78         runBlocking {
79             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
80                 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
81             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
82
83             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
84
85             val topic = "default-topic"
86             val partitions: MutableList<TopicPartition> = arrayListOf()
87             val topicsCollection: MutableList<String> = arrayListOf()
88             partitions.add(TopicPartition(topic, 1))
89             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
90             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
91
92             val records: Long = 10
93             partitions.forEach { partition ->
94                 partitionsBeginningMap[partition] = 0L
95                 partitionsEndMap[partition] = records
96                 topicsCollection.add(partition.topic())
97             }
98             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
99             mockKafkaConsumer.subscribe(topicsCollection)
100             mockKafkaConsumer.rebalance(partitions)
101             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
102             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
103             for (i in 1..10) {
104                 val record = ConsumerRecord<String, ByteArray>(
105                     topic, 1, i.toLong(), "key_$i",
106                     "I am message $i".toByteArray()
107                 )
108                 mockKafkaConsumer.addRecord(record)
109             }
110
111             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
112             val channel = spyBlueprintMessageConsumerService.subscribe(null)
113             launch {
114                 channel.consumeEach {
115                     assertTrue(it.startsWith("I am message"), "failed to get the actual message")
116                 }
117             }
118             delay(10)
119             spyBlueprintMessageConsumerService.shutDown()
120         }
121     }
122
123     @Test
124     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
125         runBlocking {
126             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
127                 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
128             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
129
130             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
131
132             val topic = "default-topic"
133             val partitions: MutableList<TopicPartition> = arrayListOf()
134             val topicsCollection: MutableList<String> = arrayListOf()
135             partitions.add(TopicPartition(topic, 1))
136             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
137             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
138
139             val records: Long = 10
140             partitions.forEach { partition ->
141                 partitionsBeginningMap[partition] = 0L
142                 partitionsEndMap[partition] = records
143                 topicsCollection.add(partition.topic())
144             }
145             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
146             mockKafkaConsumer.subscribe(topicsCollection)
147             mockKafkaConsumer.rebalance(partitions)
148             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
149             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
150             for (i in 1..10) {
151                 val record = ConsumerRecord<String, ByteArray>(
152                     topic, 1, i.toLong(), "key_$i",
153                     "I am message $i".toByteArray()
154                 )
155                 mockKafkaConsumer.addRecord(record)
156             }
157
158             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
159             /** Test Consumer Function implementation */
160             val consumerFunction = object : KafkaConsumerRecordsFunction {
161                 override suspend fun invoke(
162                     messageConsumerProperties: MessageConsumerProperties,
163                     consumer: Consumer<*, *>,
164                     consumerRecords: ConsumerRecords<*, *>
165                 ) {
166                     val count = consumerRecords.count()
167                     log.trace("Received Message count($count)")
168                 }
169             }
170             spyBlueprintMessageConsumerService.consume(consumerFunction)
171             delay(10)
172             spyBlueprintMessageConsumerService.shutDown()
173         }
174     }
175
176     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
177     // @Test
178     fun testKafkaIntegration() {
179         runBlocking {
180             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
181                 .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
182             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
183
184             val channel = blueprintMessageConsumerService.subscribe(null)
185             launch {
186                 channel.consumeEach {
187                     log.info("Consumed Message : $it")
188                 }
189             }
190
191             /** Send message with every 1 sec */
192             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
193                 .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
194             launch {
195                 repeat(5) {
196                     delay(100)
197                     val headers: MutableMap<String, String> = hashMapOf()
198                     headers["id"] = it.toString()
199                     blueprintMessageProducerService.sendMessageNB(
200                         message = "this is my message($it)",
201                         headers = headers
202                     )
203                 }
204             }
205             delay(5000)
206             blueprintMessageConsumerService.shutDown()
207         }
208     }
209 }