37d8b24fb23278454b869f2df5f7e78aa4521bb9
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
4  *  Modification Copyright (C) 2022 Nordix Foundation.
5  *
6  *  Licensed under the Apache License, Version 2.0 (the "License");
7  *  you may not use this file except in compliance with the License.
8  *  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */
18
19 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20
21 import io.micrometer.core.instrument.MeterRegistry
22 import io.mockk.every
23 import io.mockk.spyk
24 import kotlinx.coroutines.channels.consumeEach
25 import kotlinx.coroutines.delay
26 import kotlinx.coroutines.launch
27 import kotlinx.coroutines.runBlocking
28 import org.apache.kafka.clients.CommonClientConfigs
29 import org.apache.kafka.clients.consumer.Consumer
30 import org.apache.kafka.clients.consumer.ConsumerConfig
31 import org.apache.kafka.clients.consumer.ConsumerRecord
32 import org.apache.kafka.clients.consumer.ConsumerRecords
33 import org.apache.kafka.clients.consumer.MockConsumer
34 import org.apache.kafka.clients.consumer.OffsetResetStrategy
35 import org.apache.kafka.clients.producer.ProducerConfig
36 import org.apache.kafka.common.TopicPartition
37 import org.apache.kafka.common.config.SaslConfigs
38 import org.apache.kafka.common.config.SslConfigs
39 import org.apache.kafka.common.security.auth.SecurityProtocol
40 import org.apache.kafka.common.security.scram.ScramLoginModule
41 import org.apache.kafka.common.serialization.ByteArrayDeserializer
42 import org.apache.kafka.common.serialization.StringDeserializer
43 import org.junit.Test
44 import org.junit.runner.RunWith
45 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
46 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
48 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
49 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
50 import org.onap.ccsdk.cds.controllerblueprints.core.logger
51 import org.springframework.beans.factory.annotation.Autowired
52 import org.springframework.boot.test.mock.mockito.MockBean
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import java.nio.charset.Charset
58 import kotlin.test.assertEquals
59 import kotlin.test.assertNotNull
60 import kotlin.test.assertTrue
61
62 @RunWith(SpringRunner::class)
63 @DirtiesContext
64 @ContextConfiguration(
65     classes = [
66         BluePrintMessageLibConfiguration::class,
67         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
68     ]
69 )
70 @TestPropertySource(
71     properties =
72         [
73             "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
74             "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
75             "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
76             "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
77             "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
78             "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
79             "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
80             "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
81             "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
82             "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
83             "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
84             "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
85             "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
86
87             "blueprintsprocessor.messageconsumer.sample2.type=kafka-scram-plain-text-auth",
88             "blueprintsprocessor.messageconsumer.sample2.bootstrapServers=127.0.0.1:9092",
89             "blueprintsprocessor.messageconsumer.sample2.groupId=sample-group",
90             "blueprintsprocessor.messageconsumer.sample2.topic=default-topic",
91             "blueprintsprocessor.messageconsumer.sample2.clientId=default-client-id",
92             "blueprintsprocessor.messageconsumer.sample2.pollMillSec=10",
93             "blueprintsprocessor.messageconsumer.sample2.pollRecords=1",
94             "blueprintsprocessor.messageconsumer.sample2.scramUsername=sample-user",
95             "blueprintsprocessor.messageconsumer.sample2.scramPassword=secretpassword",
96
97             "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
98             "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
99             "blueprintsprocessor.messageproducer.sample.topic=default-topic",
100             "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
101             "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
102             "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
103             "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
104             "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
105             "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
106             "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
107         ]
108 )
109 open class BlueprintMessageConsumerServiceTest {
110
111     val log = logger(BlueprintMessageConsumerServiceTest::class)
112
113     @Autowired
114     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
115
116     @MockBean
117     lateinit var meterRegistry: MeterRegistry
118
119     @Test
120     fun testKafkaBasicAuthConsumerService() {
121         runBlocking {
122             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
123                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
124             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
125
126             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
127
128             val topic = "default-topic"
129             val partitions: MutableList<TopicPartition> = arrayListOf()
130             val topicsCollection: MutableList<String> = arrayListOf()
131             partitions.add(TopicPartition(topic, 1))
132             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
133             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
134
135             val records: Long = 10
136             partitions.forEach { partition ->
137                 partitionsBeginningMap[partition] = 0L
138                 partitionsEndMap[partition] = records
139                 topicsCollection.add(partition.topic())
140             }
141             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
142             mockKafkaConsumer.subscribe(topicsCollection)
143             mockKafkaConsumer.rebalance(partitions)
144             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
145             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
146             for (i in 1..10) {
147                 val record = ConsumerRecord<String, ByteArray>(
148                     topic, 1, i.toLong(), "key_$i",
149                     "I am message $i".toByteArray()
150                 )
151                 mockKafkaConsumer.addRecord(record)
152             }
153
154             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
155             val channel = spyBlueprintMessageConsumerService.subscribe(null)
156             var i = 0
157             launch {
158                 channel.consumeEach {
159                     ++i
160                     val key = it.key()
161                     val value = String(it.value(), Charset.defaultCharset())
162                     assertTrue(value.startsWith("I am message"), "failed to get the actual message")
163                     assertEquals("key_$i", key)
164                 }
165             }
166             delay(10)
167             spyBlueprintMessageConsumerService.shutDown()
168         }
169     }
170
171     @Test
172     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
173         runBlocking {
174             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
175                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
176             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
177
178             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
179
180             val topic = "default-topic"
181             val partitions: MutableList<TopicPartition> = arrayListOf()
182             val topicsCollection: MutableList<String> = arrayListOf()
183             partitions.add(TopicPartition(topic, 1))
184             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
185             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
186
187             val records: Long = 10
188             partitions.forEach { partition ->
189                 partitionsBeginningMap[partition] = 0L
190                 partitionsEndMap[partition] = records
191                 topicsCollection.add(partition.topic())
192             }
193             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
194             mockKafkaConsumer.subscribe(topicsCollection)
195             mockKafkaConsumer.rebalance(partitions)
196             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
197             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
198             for (i in 1..10) {
199                 val record = ConsumerRecord<String, ByteArray>(
200                     topic, 1, i.toLong(), "key_$i",
201                     "I am message $i".toByteArray()
202                 )
203                 mockKafkaConsumer.addRecord(record)
204             }
205
206             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
207             /** Test Consumer Function implementation */
208             val consumerFunction = object : KafkaConsumerRecordsFunction {
209                 override suspend fun invoke(
210                     messageConsumerProperties: MessageConsumerProperties,
211                     consumer: Consumer<*, *>,
212                     consumerRecords: ConsumerRecords<*, *>
213                 ) {
214                     val count = consumerRecords.count()
215                     log.trace("Received Message count($count)")
216                 }
217             }
218             spyBlueprintMessageConsumerService.consume(consumerFunction)
219             delay(10)
220             spyBlueprintMessageConsumerService.shutDown()
221         }
222     }
223
224     @Test
225     fun testKafkaScramSslAuthConfig() {
226         val expectedConfig = mapOf<String, Any>(
227             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
228             ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
229             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
230             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
231             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
232             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
233             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
234             SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
235             SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
236             SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
237             SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
238             SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
239             SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
240             SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
241             SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
242             SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
243                 "username=\"sample-user\" " +
244                 "password=\"secretpassword\";"
245         )
246
247         val messageConsumerProperties = bluePrintMessageLibPropertyService
248             .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
249
250         val configProps = messageConsumerProperties.getConfig()
251
252         assertEquals(
253             messageConsumerProperties.topic,
254             "default-topic",
255             "Topic doesn't match the expected value"
256         )
257         assertEquals(
258             messageConsumerProperties.type,
259             "kafka-scram-ssl-auth",
260             "Authentication type doesn't match the expected value"
261         )
262
263         assertTrue(
264             configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
265             "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
266         )
267         assertTrue(
268             configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
269             "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
270         )
271
272         expectedConfig.forEach {
273             assertTrue(
274                 configProps.containsKey(it.key),
275                 "Missing expected kafka config key : ${it.key}"
276             )
277             assertEquals(
278                 configProps[it.key],
279                 it.value,
280                 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
281             )
282         }
283     }
284
285
286     @Test
287     fun testKafkaScramPlaintextAuthConfig() {
288         val expectedConfig = mapOf<String, Any>(
289             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
290             ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
291             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
292             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
293             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
294             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
295             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
296         )
297
298         val messageConsumerProperties = bluePrintMessageLibPropertyService
299             .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample2")
300
301         val configProps = messageConsumerProperties.getConfig()
302
303         assertEquals(
304             messageConsumerProperties.topic,
305             "default-topic",
306             "Topic doesn't match the expected value"
307         )
308         assertEquals(
309             messageConsumerProperties.type,
310             "kafka-scram-plain-text-auth",
311             "Authentication type doesn't match the expected value"
312         )
313
314         assertTrue(
315             configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
316             "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
317         )
318         assertTrue(
319             configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
320             "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
321         )
322
323         expectedConfig.forEach {
324             assertTrue(
325                 configProps.containsKey(it.key),
326                 "Missing expected kafka config key : ${it.key}"
327             )
328             assertEquals(
329                 configProps[it.key],
330                 it.value,
331                 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
332             )
333         }
334     }
335
336     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
337     // @Test
338     fun testKafkaIntegration() {
339         runBlocking {
340             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
341                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
342             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
343
344             val channel = blueprintMessageConsumerService.subscribe(null)
345             launch {
346                 channel.consumeEach {
347                     log.info("Consumed Message : $it")
348                 }
349             }
350
351             /** Send message with every 1 sec */
352             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
353                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
354             launch {
355                 repeat(5) {
356                     delay(100)
357                     val headers: MutableMap<String, String> = hashMapOf()
358                     headers["id"] = it.toString()
359                     blueprintMessageProducerService.sendMessageNB(
360                         key = "mykey",
361                         message = "this is my message($it)",
362                         headers = headers
363                     )
364                 }
365             }
366             delay(5000)
367             blueprintMessageConsumerService.shutDown()
368         }
369     }
370 }