ac08dc7b70e3acf7de78440899029ffafb9e8440
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.Consumer
28 import org.apache.kafka.clients.consumer.ConsumerConfig
29 import org.apache.kafka.clients.consumer.ConsumerRecord
30 import org.apache.kafka.clients.consumer.ConsumerRecords
31 import org.apache.kafka.clients.consumer.MockConsumer
32 import org.apache.kafka.clients.consumer.OffsetResetStrategy
33 import org.apache.kafka.clients.producer.ProducerConfig
34 import org.apache.kafka.common.TopicPartition
35 import org.apache.kafka.common.config.SaslConfigs
36 import org.apache.kafka.common.config.SslConfigs
37 import org.apache.kafka.common.security.auth.SecurityProtocol
38 import org.apache.kafka.common.security.scram.ScramLoginModule
39 import org.apache.kafka.common.serialization.ByteArrayDeserializer
40 import org.apache.kafka.common.serialization.StringDeserializer
41 import org.junit.Test
42 import org.junit.runner.RunWith
43 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.springframework.beans.factory.annotation.Autowired
50 import org.springframework.test.annotation.DirtiesContext
51 import org.springframework.test.context.ContextConfiguration
52 import org.springframework.test.context.TestPropertySource
53 import org.springframework.test.context.junit4.SpringRunner
54 import kotlin.test.assertEquals
55 import kotlin.test.assertNotNull
56 import kotlin.test.assertTrue
57
58 @RunWith(SpringRunner::class)
59 @DirtiesContext
60 @ContextConfiguration(
61     classes = [BluePrintMessageLibConfiguration::class,
62         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
63 )
64 @TestPropertySource(
65     properties =
66     ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
67         "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
68         "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
69         "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
70         "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
71         "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
72         "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
73         "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
74         "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
75         "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
76         "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
77         "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
78         "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
79
80         "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
81         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
82         "blueprintsprocessor.messageproducer.sample.topic=default-topic",
83         "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
84         "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
85         "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
86         "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
87         "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
88         "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
89         "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
90     ]
91 )
92 open class BlueprintMessageConsumerServiceTest {
93
94     val log = logger(BlueprintMessageConsumerServiceTest::class)
95
96     @Autowired
97     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
98
99     @Test
100     fun testKafkaBasicAuthConsumerService() {
101         runBlocking {
102             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
103                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
104             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
105
106             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
107
108             val topic = "default-topic"
109             val partitions: MutableList<TopicPartition> = arrayListOf()
110             val topicsCollection: MutableList<String> = arrayListOf()
111             partitions.add(TopicPartition(topic, 1))
112             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
113             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
114
115             val records: Long = 10
116             partitions.forEach { partition ->
117                 partitionsBeginningMap[partition] = 0L
118                 partitionsEndMap[partition] = records
119                 topicsCollection.add(partition.topic())
120             }
121             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
122             mockKafkaConsumer.subscribe(topicsCollection)
123             mockKafkaConsumer.rebalance(partitions)
124             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
125             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
126             for (i in 1..10) {
127                 val record = ConsumerRecord<String, ByteArray>(
128                     topic, 1, i.toLong(), "key_$i",
129                     "I am message $i".toByteArray()
130                 )
131                 mockKafkaConsumer.addRecord(record)
132             }
133
134             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
135             val channel = spyBlueprintMessageConsumerService.subscribe(null)
136             launch {
137                 channel.consumeEach {
138                     assertTrue(it.startsWith("I am message"), "failed to get the actual message")
139                 }
140             }
141             delay(10)
142             spyBlueprintMessageConsumerService.shutDown()
143         }
144     }
145
146     @Test
147     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
148         runBlocking {
149             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
150                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
151             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
152
153             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
154
155             val topic = "default-topic"
156             val partitions: MutableList<TopicPartition> = arrayListOf()
157             val topicsCollection: MutableList<String> = arrayListOf()
158             partitions.add(TopicPartition(topic, 1))
159             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
160             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
161
162             val records: Long = 10
163             partitions.forEach { partition ->
164                 partitionsBeginningMap[partition] = 0L
165                 partitionsEndMap[partition] = records
166                 topicsCollection.add(partition.topic())
167             }
168             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
169             mockKafkaConsumer.subscribe(topicsCollection)
170             mockKafkaConsumer.rebalance(partitions)
171             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
172             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
173             for (i in 1..10) {
174                 val record = ConsumerRecord<String, ByteArray>(
175                     topic, 1, i.toLong(), "key_$i",
176                     "I am message $i".toByteArray()
177                 )
178                 mockKafkaConsumer.addRecord(record)
179             }
180
181             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
182             /** Test Consumer Function implementation */
183             val consumerFunction = object : KafkaConsumerRecordsFunction {
184                 override suspend fun invoke(
185                     messageConsumerProperties: MessageConsumerProperties,
186                     consumer: Consumer<*, *>,
187                     consumerRecords: ConsumerRecords<*, *>
188                 ) {
189                     val count = consumerRecords.count()
190                     log.trace("Received Message count($count)")
191                 }
192             }
193             spyBlueprintMessageConsumerService.consume(consumerFunction)
194             delay(10)
195             spyBlueprintMessageConsumerService.shutDown()
196         }
197     }
198
199     @Test
200     fun testKafkaScramSslAuthConfig() {
201
202         val expectedConfig = mapOf<String, Any>(
203                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
204                 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
205                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
206                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
207                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
208                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
209                 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
210                 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
211                 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
212                 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
213                 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
214                 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
215                 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
216                 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
217                 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
218                 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
219                 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
220                         "username=\"sample-user\" " +
221                         "password=\"secretpassword\";"
222                 )
223
224         val messageConsumerProperties = bluePrintMessageLibPropertyService
225                 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
226
227         val configProps = messageConsumerProperties.getConfig()
228
229         assertEquals(messageConsumerProperties.topic,
230                 "default-topic",
231                 "Topic doesn't match the expected value"
232         )
233         assertEquals(messageConsumerProperties.type,
234                 "kafka-scram-ssl-auth",
235                 "Authentication type doesn't match the expected value")
236
237         expectedConfig.forEach {
238             assertTrue(configProps.containsKey(it.key),
239                     "Missing expected kafka config key : ${it.key}")
240             assertEquals(configProps[it.key],
241                     it.value,
242                     "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
243             )
244         }
245     }
246
247     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
248     // @Test
249     fun testKafkaIntegration() {
250         runBlocking {
251             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
252                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
253             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
254
255             val channel = blueprintMessageConsumerService.subscribe(null)
256             launch {
257                 channel.consumeEach {
258                     log.info("Consumed Message : $it")
259                 }
260             }
261
262             /** Send message with every 1 sec */
263             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
264                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
265             launch {
266                 repeat(5) {
267                     delay(100)
268                     val headers: MutableMap<String, String> = hashMapOf()
269                     headers["id"] = it.toString()
270                     blueprintMessageProducerService.sendMessageNB(
271                         message = "this is my message($it)",
272                         headers = headers
273                     )
274                 }
275             }
276             delay(5000)
277             blueprintMessageConsumerService.shutDown()
278         }
279     }
280 }