77bdbe4087c32af7a0de30fc51b4fef37fbaeac3
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.Consumer
28 import org.apache.kafka.clients.consumer.ConsumerConfig
29 import org.apache.kafka.clients.consumer.ConsumerRecord
30 import org.apache.kafka.clients.consumer.ConsumerRecords
31 import org.apache.kafka.clients.consumer.MockConsumer
32 import org.apache.kafka.clients.consumer.OffsetResetStrategy
33 import org.apache.kafka.clients.producer.ProducerConfig
34 import org.apache.kafka.common.TopicPartition
35 import org.apache.kafka.common.config.SaslConfigs
36 import org.apache.kafka.common.config.SslConfigs
37 import org.apache.kafka.common.security.auth.SecurityProtocol
38 import org.apache.kafka.common.security.scram.ScramLoginModule
39 import org.apache.kafka.common.serialization.ByteArrayDeserializer
40 import org.apache.kafka.common.serialization.StringDeserializer
41 import org.junit.Test
42 import org.junit.runner.RunWith
43 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.springframework.beans.factory.annotation.Autowired
50 import org.springframework.test.annotation.DirtiesContext
51 import org.springframework.test.context.ContextConfiguration
52 import org.springframework.test.context.TestPropertySource
53 import org.springframework.test.context.junit4.SpringRunner
54 import java.nio.charset.Charset
55 import kotlin.test.assertEquals
56 import kotlin.test.assertNotNull
57 import kotlin.test.assertTrue
58
59 @RunWith(SpringRunner::class)
60 @DirtiesContext
61 @ContextConfiguration(
62     classes = [BluePrintMessageLibConfiguration::class,
63         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
64 )
65 @TestPropertySource(
66     properties =
67     ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
68         "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
69         "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
70         "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
71         "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
72         "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
73         "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
74         "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
75         "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
76         "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
77         "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
78         "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
79         "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
80
81         "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
82         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
83         "blueprintsprocessor.messageproducer.sample.topic=default-topic",
84         "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
85         "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
86         "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
87         "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
88         "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
89         "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
90         "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
91     ]
92 )
93 open class BlueprintMessageConsumerServiceTest {
94
95     val log = logger(BlueprintMessageConsumerServiceTest::class)
96
97     @Autowired
98     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
99
100     @Test
101     fun testKafkaBasicAuthConsumerService() {
102         runBlocking {
103             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
104                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
105             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
106
107             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
108
109             val topic = "default-topic"
110             val partitions: MutableList<TopicPartition> = arrayListOf()
111             val topicsCollection: MutableList<String> = arrayListOf()
112             partitions.add(TopicPartition(topic, 1))
113             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
114             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
115
116             val records: Long = 10
117             partitions.forEach { partition ->
118                 partitionsBeginningMap[partition] = 0L
119                 partitionsEndMap[partition] = records
120                 topicsCollection.add(partition.topic())
121             }
122             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
123             mockKafkaConsumer.subscribe(topicsCollection)
124             mockKafkaConsumer.rebalance(partitions)
125             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
126             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
127             for (i in 1..10) {
128                 val record = ConsumerRecord<String, ByteArray>(
129                     topic, 1, i.toLong(), "key_$i",
130                     "I am message $i".toByteArray()
131                 )
132                 mockKafkaConsumer.addRecord(record)
133             }
134
135             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
136             val channel = spyBlueprintMessageConsumerService.subscribe(null)
137             var i = 0
138             launch {
139                 channel.consumeEach {
140                     ++i
141                     val key = it.key()
142                     val value = String(it.value(), Charset.defaultCharset())
143                     assertTrue(value.startsWith("I am message"), "failed to get the actual message")
144                     assertEquals("key_$i", key)
145                 }
146             }
147             delay(10)
148             spyBlueprintMessageConsumerService.shutDown()
149         }
150     }
151
152     @Test
153     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
154         runBlocking {
155             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
156                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
157             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
158
159             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
160
161             val topic = "default-topic"
162             val partitions: MutableList<TopicPartition> = arrayListOf()
163             val topicsCollection: MutableList<String> = arrayListOf()
164             partitions.add(TopicPartition(topic, 1))
165             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
166             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
167
168             val records: Long = 10
169             partitions.forEach { partition ->
170                 partitionsBeginningMap[partition] = 0L
171                 partitionsEndMap[partition] = records
172                 topicsCollection.add(partition.topic())
173             }
174             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
175             mockKafkaConsumer.subscribe(topicsCollection)
176             mockKafkaConsumer.rebalance(partitions)
177             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
178             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
179             for (i in 1..10) {
180                 val record = ConsumerRecord<String, ByteArray>(
181                     topic, 1, i.toLong(), "key_$i",
182                     "I am message $i".toByteArray()
183                 )
184                 mockKafkaConsumer.addRecord(record)
185             }
186
187             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
188             /** Test Consumer Function implementation */
189             val consumerFunction = object : KafkaConsumerRecordsFunction {
190                 override suspend fun invoke(
191                     messageConsumerProperties: MessageConsumerProperties,
192                     consumer: Consumer<*, *>,
193                     consumerRecords: ConsumerRecords<*, *>
194                 ) {
195                     val count = consumerRecords.count()
196                     log.trace("Received Message count($count)")
197                 }
198             }
199             spyBlueprintMessageConsumerService.consume(consumerFunction)
200             delay(10)
201             spyBlueprintMessageConsumerService.shutDown()
202         }
203     }
204
205     @Test
206     fun testKafkaScramSslAuthConfig() {
207
208         val expectedConfig = mapOf<String, Any>(
209                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
210                 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
211                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
212                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
213                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
214                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
215                 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
216                 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
217                 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
218                 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
219                 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
220                 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
221                 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
222                 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
223                 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
224                 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
225                 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
226                         "username=\"sample-user\" " +
227                         "password=\"secretpassword\";"
228                 )
229
230         val messageConsumerProperties = bluePrintMessageLibPropertyService
231                 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
232
233         val configProps = messageConsumerProperties.getConfig()
234
235         assertEquals(messageConsumerProperties.topic,
236                 "default-topic",
237                 "Topic doesn't match the expected value"
238         )
239         assertEquals(messageConsumerProperties.type,
240                 "kafka-scram-ssl-auth",
241                 "Authentication type doesn't match the expected value")
242
243         expectedConfig.forEach {
244             assertTrue(configProps.containsKey(it.key),
245                     "Missing expected kafka config key : ${it.key}")
246             assertEquals(configProps[it.key],
247                     it.value,
248                     "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
249             )
250         }
251     }
252
253     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
254     // @Test
255     fun testKafkaIntegration() {
256         runBlocking {
257             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
258                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
259             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
260
261             val channel = blueprintMessageConsumerService.subscribe(null)
262             launch {
263                 channel.consumeEach {
264                     log.info("Consumed Message : $it")
265                 }
266             }
267
268             /** Send message with every 1 sec */
269             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
270                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
271             launch {
272                 repeat(5) {
273                     delay(100)
274                     val headers: MutableMap<String, String> = hashMapOf()
275                     headers["id"] = it.toString()
276                     blueprintMessageProducerService.sendMessageNB(
277                         key = "mykey",
278                         message = "this is my message($it)",
279                         headers = headers
280                     )
281                 }
282             }
283             delay(5000)
284             blueprintMessageConsumerService.shutDown()
285         }
286     }
287 }