a69f9f51a874ec2fbadc132ec9b7178dab0ded74
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.Consumer
28 import org.apache.kafka.clients.consumer.ConsumerConfig
29 import org.apache.kafka.clients.consumer.ConsumerRecord
30 import org.apache.kafka.clients.consumer.ConsumerRecords
31 import org.apache.kafka.clients.consumer.MockConsumer
32 import org.apache.kafka.clients.consumer.OffsetResetStrategy
33 import org.apache.kafka.clients.producer.ProducerConfig
34 import org.apache.kafka.common.TopicPartition
35 import org.apache.kafka.common.config.SaslConfigs
36 import org.apache.kafka.common.config.SslConfigs
37 import org.apache.kafka.common.security.auth.SecurityProtocol
38 import org.apache.kafka.common.security.scram.ScramLoginModule
39 import org.apache.kafka.common.serialization.ByteArrayDeserializer
40 import org.apache.kafka.common.serialization.StringDeserializer
41 import org.junit.Test
42 import org.junit.runner.RunWith
43 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
46 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.springframework.beans.factory.annotation.Autowired
50 import org.springframework.test.annotation.DirtiesContext
51 import org.springframework.test.context.ContextConfiguration
52 import org.springframework.test.context.TestPropertySource
53 import org.springframework.test.context.junit4.SpringRunner
54 import java.nio.charset.Charset
55 import kotlin.test.assertEquals
56 import kotlin.test.assertNotNull
57 import kotlin.test.assertTrue
58
59 @RunWith(SpringRunner::class)
60 @DirtiesContext
61 @ContextConfiguration(
62     classes = [
63         BluePrintMessageLibConfiguration::class,
64         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
65     ]
66 )
67 @TestPropertySource(
68     properties =
69         [
70             "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
71             "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
72             "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
73             "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
74             "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
75             "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
76             "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
77             "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
78             "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
79             "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
80             "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
81             "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
82             "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
83
84             "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
85             "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
86             "blueprintsprocessor.messageproducer.sample.topic=default-topic",
87             "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
88             "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
89             "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
90             "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
91             "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
92             "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
93             "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
94         ]
95 )
96 open class BlueprintMessageConsumerServiceTest {
97
98     val log = logger(BlueprintMessageConsumerServiceTest::class)
99
100     @Autowired
101     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
102
103     @Test
104     fun testKafkaBasicAuthConsumerService() {
105         runBlocking {
106             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
107                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
108             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
109
110             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
111
112             val topic = "default-topic"
113             val partitions: MutableList<TopicPartition> = arrayListOf()
114             val topicsCollection: MutableList<String> = arrayListOf()
115             partitions.add(TopicPartition(topic, 1))
116             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
117             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
118
119             val records: Long = 10
120             partitions.forEach { partition ->
121                 partitionsBeginningMap[partition] = 0L
122                 partitionsEndMap[partition] = records
123                 topicsCollection.add(partition.topic())
124             }
125             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
126             mockKafkaConsumer.subscribe(topicsCollection)
127             mockKafkaConsumer.rebalance(partitions)
128             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
129             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
130             for (i in 1..10) {
131                 val record = ConsumerRecord<String, ByteArray>(
132                     topic, 1, i.toLong(), "key_$i",
133                     "I am message $i".toByteArray()
134                 )
135                 mockKafkaConsumer.addRecord(record)
136             }
137
138             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
139             val channel = spyBlueprintMessageConsumerService.subscribe(null)
140             var i = 0
141             launch {
142                 channel.consumeEach {
143                     ++i
144                     val key = it.key()
145                     val value = String(it.value(), Charset.defaultCharset())
146                     assertTrue(value.startsWith("I am message"), "failed to get the actual message")
147                     assertEquals("key_$i", key)
148                 }
149             }
150             delay(10)
151             spyBlueprintMessageConsumerService.shutDown()
152         }
153     }
154
155     @Test
156     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
157         runBlocking {
158             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
159                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
160             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
161
162             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
163
164             val topic = "default-topic"
165             val partitions: MutableList<TopicPartition> = arrayListOf()
166             val topicsCollection: MutableList<String> = arrayListOf()
167             partitions.add(TopicPartition(topic, 1))
168             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
169             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
170
171             val records: Long = 10
172             partitions.forEach { partition ->
173                 partitionsBeginningMap[partition] = 0L
174                 partitionsEndMap[partition] = records
175                 topicsCollection.add(partition.topic())
176             }
177             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
178             mockKafkaConsumer.subscribe(topicsCollection)
179             mockKafkaConsumer.rebalance(partitions)
180             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
181             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
182             for (i in 1..10) {
183                 val record = ConsumerRecord<String, ByteArray>(
184                     topic, 1, i.toLong(), "key_$i",
185                     "I am message $i".toByteArray()
186                 )
187                 mockKafkaConsumer.addRecord(record)
188             }
189
190             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
191             /** Test Consumer Function implementation */
192             val consumerFunction = object : KafkaConsumerRecordsFunction {
193                 override suspend fun invoke(
194                     messageConsumerProperties: MessageConsumerProperties,
195                     consumer: Consumer<*, *>,
196                     consumerRecords: ConsumerRecords<*, *>
197                 ) {
198                     val count = consumerRecords.count()
199                     log.trace("Received Message count($count)")
200                 }
201             }
202             spyBlueprintMessageConsumerService.consume(consumerFunction)
203             delay(10)
204             spyBlueprintMessageConsumerService.shutDown()
205         }
206     }
207
208     @Test
209     fun testKafkaScramSslAuthConfig() {
210
211         val expectedConfig = mapOf<String, Any>(
212             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
213             ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
214             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
215             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
216             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
217             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
218             ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
219             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
220             SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
221             SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
222             SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
223             SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
224             SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
225             SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
226             SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
227             SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
228             SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
229                 "username=\"sample-user\" " +
230                 "password=\"secretpassword\";"
231         )
232
233         val messageConsumerProperties = bluePrintMessageLibPropertyService
234             .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
235
236         val configProps = messageConsumerProperties.getConfig()
237
238         assertEquals(
239             messageConsumerProperties.topic,
240             "default-topic",
241             "Topic doesn't match the expected value"
242         )
243         assertEquals(
244             messageConsumerProperties.type,
245             "kafka-scram-ssl-auth",
246             "Authentication type doesn't match the expected value"
247         )
248
249         expectedConfig.forEach {
250             assertTrue(
251                 configProps.containsKey(it.key),
252                 "Missing expected kafka config key : ${it.key}"
253             )
254             assertEquals(
255                 configProps[it.key],
256                 it.value,
257                 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
258             )
259         }
260     }
261
262     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
263     // @Test
264     fun testKafkaIntegration() {
265         runBlocking {
266             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
267                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
268             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
269
270             val channel = blueprintMessageConsumerService.subscribe(null)
271             launch {
272                 channel.consumeEach {
273                     log.info("Consumed Message : $it")
274                 }
275             }
276
277             /** Send message with every 1 sec */
278             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
279                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
280             launch {
281                 repeat(5) {
282                     delay(100)
283                     val headers: MutableMap<String, String> = hashMapOf()
284                     headers["id"] = it.toString()
285                     blueprintMessageProducerService.sendMessageNB(
286                         key = "mykey",
287                         message = "this is my message($it)",
288                         headers = headers
289                     )
290                 }
291             }
292             delay(5000)
293             blueprintMessageConsumerService.shutDown()
294         }
295     }
296 }