Formatting Code base with ktlint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
index b2accfb..823ba7d 100644 (file)
@@ -23,7 +23,11 @@ import kotlinx.coroutines.channels.consumeEach
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.consumer.*
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.clients.consumer.MockConsumer
+import org.apache.kafka.clients.consumer.OffsetResetStrategy
 import org.apache.kafka.common.TopicPartition
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -40,26 +44,30 @@ import org.springframework.test.context.junit4.SpringRunner
 import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
 
-
 @RunWith(SpringRunner::class)
 @DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
-    BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
-@TestPropertySource(properties =
-["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
-    "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
-    "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
-    "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
-    "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
-    "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
-    "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
-
-    "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
-    "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
-    "blueprintsprocessor.messageproducer.sample.topic=default-topic",
-    "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
-])
+@ContextConfiguration(
+    classes = [BluePrintMessageLibConfiguration::class,
+        BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+    properties =
+    ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+        "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
+        "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
+        "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
+        "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
+        "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+        "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+
+        "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+        "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+        "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+        "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+    ]
+)
 open class BlueprintMessageConsumerServiceTest {
+
     val log = logger(BlueprintMessageConsumerServiceTest::class)
 
     @Autowired
@@ -69,7 +77,7 @@ open class BlueprintMessageConsumerServiceTest {
     fun testKafkaBasicAuthConsumerService() {
         runBlocking {
             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
-                    .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+                .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -93,8 +101,10 @@ open class BlueprintMessageConsumerServiceTest {
             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
             for (i in 1..10) {
-                val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
-                        "I am message $i".toByteArray())
+                val record = ConsumerRecord<String, ByteArray>(
+                    topic, 1, i.toLong(), "key_$i",
+                    "I am message $i".toByteArray()
+                )
                 mockKafkaConsumer.addRecord(record)
             }
 
@@ -114,7 +124,7 @@ open class BlueprintMessageConsumerServiceTest {
     fun testKafkaBasicAuthConsumerWithDynamicFunction() {
         runBlocking {
             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
-                    .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+                .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
             val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -138,16 +148,21 @@ open class BlueprintMessageConsumerServiceTest {
             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
             for (i in 1..10) {
-                val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
-                        "I am message $i".toByteArray())
+                val record = ConsumerRecord<String, ByteArray>(
+                    topic, 1, i.toLong(), "key_$i",
+                    "I am message $i".toByteArray()
+                )
                 mockKafkaConsumer.addRecord(record)
             }
 
             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
             /** Test Consumer Function implementation */
             val consumerFunction = object : KafkaConsumerRecordsFunction {
-                override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
-                                            consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
+                override suspend fun invoke(
+                    messageConsumerProperties: MessageConsumerProperties,
+                    consumer: Consumer<*, *>,
+                    consumerRecords: ConsumerRecords<*, *>
+                ) {
                     val count = consumerRecords.count()
                     log.trace("Received Message count($count)")
                 }
@@ -159,11 +174,11 @@ open class BlueprintMessageConsumerServiceTest {
     }
 
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
-    //@Test
+    // @Test
     fun testKafkaIntegration() {
         runBlocking {
             val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
-                    .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+                .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
             assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
 
             val channel = blueprintMessageConsumerService.subscribe(null)
@@ -175,18 +190,20 @@ open class BlueprintMessageConsumerServiceTest {
 
             /** Send message with every 1 sec */
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
-                    .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+                .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
             launch {
                 repeat(5) {
                     delay(100)
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.toString()
-                    blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
-                            headers = headers)
+                    blueprintMessageProducerService.sendMessageNB(
+                        message = "this is my message($it)",
+                        headers = headers
+                    )
                 }
             }
             delay(5000)
             blueprintMessageConsumerService.shutDown()
         }
     }
-}
\ No newline at end of file
+}