Prioritization Optional NATS consumer support
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
index 190f4e8..7f150f5 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
 import io.mockk.coEvery
 import io.mockk.every
+import io.mockk.mockk
 import io.mockk.spyk
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
@@ -27,13 +28,23 @@ import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.springframework.beans.factory.annotation.Autowired
@@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
 import kotlin.test.Test
 import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
 
 @RunWith(SpringRunner::class)
 @DataJpaTest
 @DirtiesContext
 @ContextConfiguration(
-    classes = [BluePrintMessageLibConfiguration::class,
+    classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
 )
 @TestPropertySource(
     properties =
     [
-        "spring.jpa.show-sql=true",
-        "spring.jpa.properties.hibernate.show_sql=true",
+        "spring.jpa.show-sql=false",
+        "spring.jpa.properties.hibernate.show_sql=false",
         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
 
         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
@@ -70,7 +80,11 @@ import kotlin.test.assertTrue
         // To send initial test message
         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
-        "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+        "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+
+        "blueprintsprocessor.nats.cds-controller.type=token-auth",
+        "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+        "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
     ]
 )
 open class MessagePrioritizationConsumerTest {
@@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest {
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
     @Autowired
-    lateinit var messagePrioritizationService: MessagePrioritizationService
-
-    @Autowired
-    lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+    lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
 
     @Autowired
-    lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+    lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
 
     @Before
     fun setup() {
@@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest {
     @Test
     fun testMessagePrioritizationService() {
         runBlocking {
-            assertTrue(
-                ::messagePrioritizationService.isInitialized,
-                "failed to initialize messagePrioritizationService"
-            )
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            val messagePrioritizationService =
+                SampleMessagePrioritizationService(messagePrioritizationStateService)
+            messagePrioritizationService.setConfiguration(configuration)
 
             log.info("****************  without Correlation **************")
             /** Checking without correlation */
@@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest {
             val spyStreamingConsumerService = spyk(streamingConsumerService)
             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
-            val messagePrioritizationConsumer = MessagePrioritizationConsumer(
-                bluePrintMessageLibPropertyService
+            val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+                bluePrintMessageLibPropertyService, mockk()
             )
             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
 
@@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest {
     @Test
     fun testSchedulerService() {
         runBlocking {
-            val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
-            assertTrue(
-                ::messagePrioritizationSchedulerService.isInitialized,
-                "failed to initialize messagePrioritizationSchedulerService"
-            )
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            val messagePrioritizationService =
+                SampleMessagePrioritizationService(messagePrioritizationStateService)
+            messagePrioritizationService.setConfiguration(configuration)
+
+            val messagePrioritizationSchedulerService =
+                MessagePrioritizationSchedulerService(messagePrioritizationService)
             launch {
-                messagePrioritizationSchedulerService.startScheduling(configuration)
+                messagePrioritizationSchedulerService.startScheduling()
             }
             launch {
                 /** To debug increase the delay time */
@@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest {
 
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
     // @Test
-    fun testMessagePrioritizationConsumer() {
+    fun testKafkaMessagePrioritizationConsumer() {
         runBlocking {
-            messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            val kafkaMessagePrioritizationService =
+                SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+            kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+            val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+                messagePrioritizationStateService,
+                kafkaMessagePrioritizationService
+            )
+
+            // Register the processor
+            BluePrintDependencyService.registerSingleton(
+                MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+                defaultMessagePrioritizeProcessor
+            )
+
+            val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+                bluePrintMessageLibPropertyService,
+                kafkaMessagePrioritizationService
+            )
+            messagePrioritizationConsumer.startConsuming(configuration)
 
             /** Send sample message with every 1 sec */
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
@@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest {
             messagePrioritizationConsumer.shutDown()
         }
     }
+
+    /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+     *  Start :
+     *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+     * */
+    // @Test
+    fun testNatsMessagePrioritizationConsumer() {
+        runBlocking {
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+            val inputSubject =
+                NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+            val natsMessagePrioritizationService =
+                SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+            natsMessagePrioritizationService.setConfiguration(configuration)
+
+            val messagePrioritizationConsumer =
+                NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+            messagePrioritizationConsumer.startConsuming()
+
+            /** Send sample message with every 1 sec */
+            val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+            launch {
+                MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+                    delay(100)
+                    bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+                }
+
+                MessagePrioritizationSample
+                    .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+                    .forEach {
+                        delay(100)
+                        bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+                    }
+
+                MessagePrioritizationSample
+                    .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+                    .forEach {
+                        delay(200)
+                        bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+                    }
+            }
+            delay(3000)
+            messagePrioritizationConsumer.shutDown()
+        }
+    }
 }