Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import io.micrometer.core.instrument.MeterRegistry
21 import io.mockk.every
22 import io.mockk.spyk
23 import kotlinx.coroutines.channels.consumeEach
24 import kotlinx.coroutines.delay
25 import kotlinx.coroutines.launch
26 import kotlinx.coroutines.runBlocking
27 import org.apache.kafka.clients.CommonClientConfigs
28 import org.apache.kafka.clients.consumer.Consumer
29 import org.apache.kafka.clients.consumer.ConsumerConfig
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.apache.kafka.clients.consumer.ConsumerRecords
32 import org.apache.kafka.clients.consumer.MockConsumer
33 import org.apache.kafka.clients.consumer.OffsetResetStrategy
34 import org.apache.kafka.clients.producer.ProducerConfig
35 import org.apache.kafka.common.TopicPartition
36 import org.apache.kafka.common.config.SaslConfigs
37 import org.apache.kafka.common.config.SslConfigs
38 import org.apache.kafka.common.security.auth.SecurityProtocol
39 import org.apache.kafka.common.security.scram.ScramLoginModule
40 import org.apache.kafka.common.serialization.ByteArrayDeserializer
41 import org.apache.kafka.common.serialization.StringDeserializer
42 import org.junit.Test
43 import org.junit.runner.RunWith
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
45 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
48 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
49 import org.onap.ccsdk.cds.controllerblueprints.core.logger
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.mock.mockito.MockBean
52 import org.springframework.test.annotation.DirtiesContext
53 import org.springframework.test.context.ContextConfiguration
54 import org.springframework.test.context.TestPropertySource
55 import org.springframework.test.context.junit4.SpringRunner
56 import java.nio.charset.Charset
57 import kotlin.test.assertEquals
58 import kotlin.test.assertNotNull
59 import kotlin.test.assertTrue
60
61 @RunWith(SpringRunner::class)
62 @DirtiesContext
63 @ContextConfiguration(
64     classes = [
65         BluePrintMessageLibConfiguration::class,
66         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
67     ]
68 )
69 @TestPropertySource(
70     properties =
71         [
72             "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
73             "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
74             "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
75             "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
76             "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
77             "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
78             "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
79             "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
80             "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
81             "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
82             "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
83             "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
84             "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
85
86             "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
87             "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
88             "blueprintsprocessor.messageproducer.sample.topic=default-topic",
89             "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
90             "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
91             "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
92             "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
93             "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
94             "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
95             "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
96         ]
97 )
98 open class BlueprintMessageConsumerServiceTest {
99
100     val log = logger(BlueprintMessageConsumerServiceTest::class)
101
102     @Autowired
103     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
104
105     @MockBean
106     lateinit var meterRegistry: MeterRegistry
107
108     @Test
109     fun testKafkaBasicAuthConsumerService() {
110         runBlocking {
111             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
112                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
113             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
114
115             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
116
117             val topic = "default-topic"
118             val partitions: MutableList<TopicPartition> = arrayListOf()
119             val topicsCollection: MutableList<String> = arrayListOf()
120             partitions.add(TopicPartition(topic, 1))
121             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
122             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
123
124             val records: Long = 10
125             partitions.forEach { partition ->
126                 partitionsBeginningMap[partition] = 0L
127                 partitionsEndMap[partition] = records
128                 topicsCollection.add(partition.topic())
129             }
130             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
131             mockKafkaConsumer.subscribe(topicsCollection)
132             mockKafkaConsumer.rebalance(partitions)
133             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
134             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
135             for (i in 1..10) {
136                 val record = ConsumerRecord<String, ByteArray>(
137                     topic, 1, i.toLong(), "key_$i",
138                     "I am message $i".toByteArray()
139                 )
140                 mockKafkaConsumer.addRecord(record)
141             }
142
143             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
144             val channel = spyBlueprintMessageConsumerService.subscribe(null)
145             var i = 0
146             launch {
147                 channel.consumeEach {
148                     ++i
149                     val key = it.key()
150                     val value = String(it.value(), Charset.defaultCharset())
151                     assertTrue(value.startsWith("I am message"), "failed to get the actual message")
152                     assertEquals("key_$i", key)
153                 }
154             }
155             delay(10)
156             spyBlueprintMessageConsumerService.shutDown()
157         }
158     }
159
160     @Test
161     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
162         runBlocking {
163             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
164                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
165             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
166
167             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
168
169             val topic = "default-topic"
170             val partitions: MutableList<TopicPartition> = arrayListOf()
171             val topicsCollection: MutableList<String> = arrayListOf()
172             partitions.add(TopicPartition(topic, 1))
173             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
174             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
175
176             val records: Long = 10
177             partitions.forEach { partition ->
178                 partitionsBeginningMap[partition] = 0L
179                 partitionsEndMap[partition] = records
180                 topicsCollection.add(partition.topic())
181             }
182             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
183             mockKafkaConsumer.subscribe(topicsCollection)
184             mockKafkaConsumer.rebalance(partitions)
185             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
186             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
187             for (i in 1..10) {
188                 val record = ConsumerRecord<String, ByteArray>(
189                     topic, 1, i.toLong(), "key_$i",
190                     "I am message $i".toByteArray()
191                 )
192                 mockKafkaConsumer.addRecord(record)
193             }
194
195             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
196             /** Test Consumer Function implementation */
197             val consumerFunction = object : KafkaConsumerRecordsFunction {
198                 override suspend fun invoke(
199                     messageConsumerProperties: MessageConsumerProperties,
200                     consumer: Consumer<*, *>,
201                     consumerRecords: ConsumerRecords<*, *>
202                 ) {
203                     val count = consumerRecords.count()
204                     log.trace("Received Message count($count)")
205                 }
206             }
207             spyBlueprintMessageConsumerService.consume(consumerFunction)
208             delay(10)
209             spyBlueprintMessageConsumerService.shutDown()
210         }
211     }
212
213     @Test
214     fun testKafkaScramSslAuthConfig() {
215         val expectedConfig = mapOf<String, Any>(
216             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
217             ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
218             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
219             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
220             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
221             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
222             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
223             SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
224             SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
225             SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
226             SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
227             SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
228             SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
229             SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
230             SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
231             SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
232                 "username=\"sample-user\" " +
233                 "password=\"secretpassword\";"
234         )
235
236         val messageConsumerProperties = bluePrintMessageLibPropertyService
237             .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
238
239         val configProps = messageConsumerProperties.getConfig()
240
241         assertEquals(
242             messageConsumerProperties.topic,
243             "default-topic",
244             "Topic doesn't match the expected value"
245         )
246         assertEquals(
247             messageConsumerProperties.type,
248             "kafka-scram-ssl-auth",
249             "Authentication type doesn't match the expected value"
250         )
251
252         assertTrue(
253             configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
254             "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
255         )
256         assertTrue(
257             configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
258             "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
259         )
260
261         expectedConfig.forEach {
262             assertTrue(
263                 configProps.containsKey(it.key),
264                 "Missing expected kafka config key : ${it.key}"
265             )
266             assertEquals(
267                 configProps[it.key],
268                 it.value,
269                 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
270             )
271         }
272     }
273
274     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
275     // @Test
276     fun testKafkaIntegration() {
277         runBlocking {
278             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
279                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
280             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
281
282             val channel = blueprintMessageConsumerService.subscribe(null)
283             launch {
284                 channel.consumeEach {
285                     log.info("Consumed Message : $it")
286                 }
287             }
288
289             /** Send message with every 1 sec */
290             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
291                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
292             launch {
293                 repeat(5) {
294                     delay(100)
295                     val headers: MutableMap<String, String> = hashMapOf()
296                     headers["id"] = it.toString()
297                     blueprintMessageProducerService.sendMessageNB(
298                         key = "mykey",
299                         message = "this is my message($it)",
300                         headers = headers
301                     )
302                 }
303             }
304             delay(5000)
305             blueprintMessageConsumerService.shutDown()
306         }
307     }
308 }