2  *  Copyright © 2019 IBM.
 
   3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
 
   5  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   6  *  you may not use this file except in compliance with the License.
 
   7  *  You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  *  Unless required by applicable law or agreed to in writing, software
 
  12  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  *  See the License for the specific language governing permissions and
 
  15  *  limitations under the License.
 
  18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
  22 import kotlinx.coroutines.channels.consumeEach
 
  23 import kotlinx.coroutines.delay
 
  24 import kotlinx.coroutines.launch
 
  25 import kotlinx.coroutines.runBlocking
 
  26 import org.apache.kafka.clients.CommonClientConfigs
 
  27 import org.apache.kafka.clients.consumer.Consumer
 
  28 import org.apache.kafka.clients.consumer.ConsumerConfig
 
  29 import org.apache.kafka.clients.consumer.ConsumerRecord
 
  30 import org.apache.kafka.clients.consumer.ConsumerRecords
 
  31 import org.apache.kafka.clients.consumer.MockConsumer
 
  32 import org.apache.kafka.clients.consumer.OffsetResetStrategy
 
  33 import org.apache.kafka.clients.producer.ProducerConfig
 
  34 import org.apache.kafka.common.TopicPartition
 
  35 import org.apache.kafka.common.config.SaslConfigs
 
  36 import org.apache.kafka.common.config.SslConfigs
 
  37 import org.apache.kafka.common.security.auth.SecurityProtocol
 
  38 import org.apache.kafka.common.security.scram.ScramLoginModule
 
  39 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 
  40 import org.apache.kafka.common.serialization.StringDeserializer
 
  42 import org.junit.runner.RunWith
 
  43 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 
  44 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 
  45 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 
  46 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 
  47 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
 
  48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 
  49 import org.springframework.beans.factory.annotation.Autowired
 
  50 import org.springframework.test.annotation.DirtiesContext
 
  51 import org.springframework.test.context.ContextConfiguration
 
  52 import org.springframework.test.context.TestPropertySource
 
  53 import org.springframework.test.context.junit4.SpringRunner
 
  54 import kotlin.test.assertEquals
 
  55 import kotlin.test.assertNotNull
 
  56 import kotlin.test.assertTrue
 
  58 @RunWith(SpringRunner::class)
 
  60 @ContextConfiguration(
 
  61     classes = [BluePrintMessageLibConfiguration::class,
 
  62         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
 
  66     ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
 
  67         "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
 
  68         "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
 
  69         "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
 
  70         "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
 
  71         "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
 
  72         "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
 
  73         "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
 
  74         "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
 
  75         "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
 
  76         "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
 
  77         "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
 
  78         "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
 
  80         "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
 
  81         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
 
  82         "blueprintsprocessor.messageproducer.sample.topic=default-topic",
 
  83         "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
 
  84         "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
 
  85         "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
 
  86         "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
 
  87         "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
 
  88         "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
 
  89         "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
 
  92 open class BlueprintMessageConsumerServiceTest {
 
  94     val log = logger(BlueprintMessageConsumerServiceTest::class)
 
  97     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
 100     fun testKafkaBasicAuthConsumerService() {
 
 102             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
 
 103                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
 
 104             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
 106             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
 
 108             val topic = "default-topic"
 
 109             val partitions: MutableList<TopicPartition> = arrayListOf()
 
 110             val topicsCollection: MutableList<String> = arrayListOf()
 
 111             partitions.add(TopicPartition(topic, 1))
 
 112             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
 
 113             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
 
 115             val records: Long = 10
 
 116             partitions.forEach { partition ->
 
 117                 partitionsBeginningMap[partition] = 0L
 
 118                 partitionsEndMap[partition] = records
 
 119                 topicsCollection.add(partition.topic())
 
 121             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
 
 122             mockKafkaConsumer.subscribe(topicsCollection)
 
 123             mockKafkaConsumer.rebalance(partitions)
 
 124             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
 
 125             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
 
 127                 val record = ConsumerRecord<String, ByteArray>(
 
 128                     topic, 1, i.toLong(), "key_$i",
 
 129                     "I am message $i".toByteArray()
 
 131                 mockKafkaConsumer.addRecord(record)
 
 134             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
 
 135             val channel = spyBlueprintMessageConsumerService.subscribe(null)
 
 137                 channel.consumeEach {
 
 138                     assertTrue(it.startsWith("I am message"), "failed to get the actual message")
 
 142             spyBlueprintMessageConsumerService.shutDown()
 
 147     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
 
 149             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
 
 150                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
 
 151             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
 153             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
 
 155             val topic = "default-topic"
 
 156             val partitions: MutableList<TopicPartition> = arrayListOf()
 
 157             val topicsCollection: MutableList<String> = arrayListOf()
 
 158             partitions.add(TopicPartition(topic, 1))
 
 159             val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
 
 160             val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
 
 162             val records: Long = 10
 
 163             partitions.forEach { partition ->
 
 164                 partitionsBeginningMap[partition] = 0L
 
 165                 partitionsEndMap[partition] = records
 
 166                 topicsCollection.add(partition.topic())
 
 168             val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
 
 169             mockKafkaConsumer.subscribe(topicsCollection)
 
 170             mockKafkaConsumer.rebalance(partitions)
 
 171             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
 
 172             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
 
 174                 val record = ConsumerRecord<String, ByteArray>(
 
 175                     topic, 1, i.toLong(), "key_$i",
 
 176                     "I am message $i".toByteArray()
 
 178                 mockKafkaConsumer.addRecord(record)
 
 181             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
 
 182             /** Test Consumer Function implementation */
 
 183             val consumerFunction = object : KafkaConsumerRecordsFunction {
 
 184                 override suspend fun invoke(
 
 185                     messageConsumerProperties: MessageConsumerProperties,
 
 186                     consumer: Consumer<*, *>,
 
 187                     consumerRecords: ConsumerRecords<*, *>
 
 189                     val count = consumerRecords.count()
 
 190                     log.trace("Received Message count($count)")
 
 193             spyBlueprintMessageConsumerService.consume(consumerFunction)
 
 195             spyBlueprintMessageConsumerService.shutDown()
 
 200     fun testKafkaScramSslAuthConfig() {
 
 202         val expectedConfig = mapOf<String, Any>(
 
 203                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
 
 204                 ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
 
 205                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
 
 206                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
 
 207                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
 
 208                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
 
 209                 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
 
 210                 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
 
 211                 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
 
 212                 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
 
 213                 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
 
 214                 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
 
 215                 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
 
 216                 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
 
 217                 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
 
 218                 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
 
 219                 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
 
 220                         "username=\"sample-user\" " +
 
 221                         "password=\"secretpassword\";"
 
 224         val messageConsumerProperties = bluePrintMessageLibPropertyService
 
 225                 .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
 
 227         val configProps = messageConsumerProperties.getConfig()
 
 229         assertEquals(messageConsumerProperties.topic,
 
 231                 "Topic doesn't match the expected value"
 
 233         assertEquals(messageConsumerProperties.type,
 
 234                 "kafka-scram-ssl-auth",
 
 235                 "Authentication type doesn't match the expected value")
 
 237         expectedConfig.forEach {
 
 238             assertTrue(configProps.containsKey(it.key),
 
 239                     "Missing expected kafka config key : ${it.key}")
 
 240             assertEquals(configProps[it.key],
 
 242                     "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
 
 247     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
 
 249     fun testKafkaIntegration() {
 
 251             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
 
 252                 .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
 
 253             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
 255             val channel = blueprintMessageConsumerService.subscribe(null)
 
 257                 channel.consumeEach {
 
 258                     log.info("Consumed Message : $it")
 
 262             /** Send message with every 1 sec */
 
 263             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
 
 264                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
 
 268                     val headers: MutableMap<String, String> = hashMapOf()
 
 269                     headers["id"] = it.toString()
 
 270                     blueprintMessageProducerService.sendMessageNB(
 
 271                         message = "this is my message($it)",
 
 277             blueprintMessageConsumerService.shutDown()