Merge "Component Script Executor Fix"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
18
19 import io.mockk.every
20 import io.mockk.spyk
21 import kotlinx.coroutines.channels.consumeEach
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.apache.kafka.clients.consumer.ConsumerRecord
26 import org.apache.kafka.clients.consumer.MockConsumer
27 import org.apache.kafka.clients.consumer.OffsetResetStrategy
28 import org.apache.kafka.common.TopicPartition
29 import org.junit.Test
30 import org.junit.runner.RunWith
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
34 import org.onap.ccsdk.cds.controllerblueprints.core.logger
35 import org.springframework.beans.factory.annotation.Autowired
36 import org.springframework.test.annotation.DirtiesContext
37 import org.springframework.test.context.ContextConfiguration
38 import org.springframework.test.context.TestPropertySource
39 import org.springframework.test.context.junit4.SpringRunner
40 import kotlin.test.assertNotNull
41 import kotlin.test.assertTrue
42
43
44 @RunWith(SpringRunner::class)
45 @DirtiesContext
46 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
47     BlueprintPropertyConfiguration::class, BluePrintProperties::class])
48 @TestPropertySource(properties =
49 ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
50     "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
51     "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
52     "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
53     "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
54     "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
55     "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
56
57     "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
58     "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
59     "blueprintsprocessor.messageproducer.sample.topic=default-topic",
60     "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
61 ])
62 open class BlueprintMessageConsumerServiceTest {
63     val log = logger(BlueprintMessageConsumerServiceTest::class)
64
65     @Autowired
66     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
67
68     @Test
69     fun testKafkaBasicAuthConsumerService() {
70         runBlocking {
71             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
72                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
73             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
74
75             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
76
77             val topic = "default-topic"
78             val partitions: MutableList<TopicPartition> = arrayListOf()
79             val topicsCollection: MutableList<String> = arrayListOf()
80             partitions.add(TopicPartition(topic, 1))
81             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
82             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
83
84             val records: Long = 10
85             partitions.forEach { partition ->
86                 partitionsBeginningMap[partition] = 0L
87                 partitionsEndMap[partition] = records
88                 topicsCollection.add(partition.topic())
89             }
90             val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
91             mockKafkaConsumer.subscribe(topicsCollection)
92             mockKafkaConsumer.rebalance(partitions)
93             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
94             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
95             for (i in 1..10) {
96                 val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
97                         "I am message $i")
98                 mockKafkaConsumer.addRecord(record)
99             }
100
101             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
102             val channel = spyBlueprintMessageConsumerService.subscribe(null)
103             launch {
104                 channel.consumeEach {
105                     assertTrue(it.startsWith("I am message"), "failed to get the actual message")
106                 }
107             }
108             delay(10)
109             spyBlueprintMessageConsumerService.shutDown()
110         }
111     }
112
113     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
114     //@Test
115     fun testKafkaIntegration() {
116         runBlocking {
117             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
118                     .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
119             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
120
121             val channel = blueprintMessageConsumerService.subscribe(null)
122             launch {
123                 channel.consumeEach {
124                     log.info("Consumed Message : $it")
125                 }
126             }
127
128             /** Send message with every 1 sec */
129             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
130                     .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
131             launch {
132                 repeat(5) {
133                     delay(100)
134                     blueprintMessageProducerService.sendMessage("this is my message($it)")
135                 }
136             }
137             delay(5000)
138             blueprintMessageConsumerService.shutDown()
139         }
140     }
141 }