Rest endpoint for message Prioritization
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
index bd99f72..ec0515c 100644 (file)
@@ -24,15 +24,16 @@ import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.junit.Before
 import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
@@ -43,32 +44,38 @@ import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
 import kotlin.test.Test
 import kotlin.test.assertNotNull
-
+import kotlin.test.assertTrue
 
 @RunWith(SpringRunner::class)
 @DataJpaTest
 @DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
-    BlueprintPropertyConfiguration::class, BluePrintProperties::class,
-    MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class])
-@TestPropertySource(properties =
-[
-    "spring.jpa.show-sql=true",
-    "spring.jpa.properties.hibernate.show_sql=true",
-    "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
-
-    "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
-    "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
-    "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
-    "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
-
-    // To send initial test message
-    "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
-    "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
-    "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
-])
+@ContextConfiguration(
+    classes = [BluePrintMessageLibConfiguration::class,
+        BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
+        MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
+)
+@TestPropertySource(
+    properties =
+    [
+        "spring.jpa.show-sql=true",
+        "spring.jpa.properties.hibernate.show_sql=true",
+        "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+        "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+        "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+        "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+        "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+
+        // To send initial test message
+        "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+        "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+        "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+    ]
+)
 open class MessagePrioritizationConsumerTest {
 
+    private val log = logger(MessagePrioritizationConsumerTest::class)
+
     @Autowired
     lateinit var applicationContext: ApplicationContext
 
@@ -78,6 +85,12 @@ open class MessagePrioritizationConsumerTest {
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
+    @Autowired
+    lateinit var messagePrioritizationService: MessagePrioritizationService
+
+    @Autowired
+    lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+
     @Before
     fun setup() {
         BluePrintDependencyService.inject(applicationContext)
@@ -89,7 +102,7 @@ open class MessagePrioritizationConsumerTest {
             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
 
             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
-                    .instance(MessagePrioritizationStateService::class)
+                .instance(MessagePrioritizationStateService::class)
             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
 
             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
@@ -100,26 +113,60 @@ open class MessagePrioritizationConsumerTest {
         }
     }
 
+    @Test
+    fun testMessagePrioritizationService() {
+        runBlocking {
+            assertTrue(
+                ::messagePrioritizationService.isInitialized,
+                "failed to initialize messagePrioritizationService"
+            )
+
+            log.info("****************  without Correlation **************")
+            /** Checking without correlation */
+            MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+                messagePrioritizationService.prioritize(it)
+            }
+            log.info("****************  Same Group , with Correlation **************")
+            /** checking same group with correlation */
+            MessagePrioritizationSample
+                .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+                .forEach {
+                    delay(10)
+                    messagePrioritizationService.prioritize(it)
+                }
+            log.info("****************  Different Type , with Correlation **************")
+            /** checking different type, with correlation */
+            MessagePrioritizationSample
+                .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+                .forEach {
+                    delay(10)
+                    messagePrioritizationService.prioritize(it)
+                }
+        }
+    }
+
     @Test
     fun testStartConsuming() {
         runBlocking {
             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
 
             val streamingConsumerService = bluePrintMessageLibPropertyService
-                    .blueprintMessageConsumerService(configuration.inputTopicSelector)
+                .blueprintMessageConsumerService(configuration.inputTopicSelector)
             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
 
             val spyStreamingConsumerService = spyk(streamingConsumerService)
             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
-            val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+            val messagePrioritizationConsumer = MessagePrioritizationConsumer(
+                bluePrintMessageLibPropertyService
+            )
             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
 
-
             // Test Topology
-            val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+            val kafkaStreamConsumerFunction =
+                spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
             val messageConsumerProperties = bluePrintMessageLibPropertyService
-                    .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+                .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
             assertNotNull(topology, "failed to get create topology")
 
@@ -130,46 +177,51 @@ open class MessagePrioritizationConsumerTest {
     }
 
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
-    //@Test
+    // @Test
     fun testMessagePrioritizationConsumer() {
         runBlocking {
-            val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
             messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
 
             /** Send sample message with every 1 sec */
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
-                    .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+                .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
             launch {
-             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+                MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
                     delay(100)
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.id
-                    blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
-                            headers = headers)
+                    blueprintMessageProducerService.sendMessageNB(
+                        message = it.asJsonString(false),
+                        headers = headers
+                    )
                 }
 
                 MessagePrioritizationSample
-                        .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
-                        .forEach {
-                            delay(100)
-                            val headers: MutableMap<String, String> = hashMapOf()
-                            headers["id"] = it.id
-                            blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
-                                    headers = headers)
-                        }
+                    .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+                    .forEach {
+                        delay(100)
+                        val headers: MutableMap<String, String> = hashMapOf()
+                        headers["id"] = it.id
+                        blueprintMessageProducerService.sendMessageNB(
+                            message = it.asJsonString(false),
+                            headers = headers
+                        )
+                    }
 
                 MessagePrioritizationSample
-                        .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
-                        .forEach {
-                            delay(2000)
-                            val headers: MutableMap<String, String> = hashMapOf()
-                            headers["id"] = it.id
-                            blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
-                                    headers = headers)
-                        }
+                    .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+                    .forEach {
+                        delay(2000)
+                        val headers: MutableMap<String, String> = hashMapOf()
+                        headers["id"] = it.id
+                        blueprintMessageProducerService.sendMessageNB(
+                            message = it.asJsonString(false),
+                            headers = headers
+                        )
+                    }
             }
             delay(10000)
             messagePrioritizationConsumer.shutDown()
         }
     }
-}
\ No newline at end of file
+}