Cluster distributed lock service.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / nats-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / nats / service / BluePrintNatsServiceTest.kt
index 976f9f5..549be64 100644 (file)
@@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
 import kotlin.test.assertNotNull
 
@@ -106,6 +107,7 @@ class BluePrintNatsServiceTest {
 
             testMultiPublish(natsService)
             testLoadBalance(natsService)
+            testLimitSubscription(natsService)
             testRequestReply(natsService)
             testMultiRequestReply(natsService)
             delay(1000)
@@ -137,12 +139,49 @@ class BluePrintNatsServiceTest {
             val lbMessageHandler2 =
                 MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
 
-            natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
-            natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+            val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+            val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
 
             repeat(5) {
                 natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
             }
+            sub1.unsubscribe()
+            sub2.unsubscribe()
+        }
+    }
+
+    private fun testLimitSubscription(natsService: BluePrintNatsService) {
+        runBlocking {
+            /** Load balance Publish Message Test **/
+            val lbMessageHandler1 =
+                MessageHandler { message ->
+                    runBlocking {
+                        println("LB Publish Message Handler 1: ${message.strData()}")
+                        message.ack()
+                    }
+                }
+            val lbMessageHandler2 =
+                MessageHandler { message ->
+                    runBlocking {
+                        println("LB Publish Message Handler 2: ${message.strData()}")
+                        message.ack()
+                    }
+                }
+
+            val sub1 = natsService.loadBalanceSubscribe(
+                "lb-publish", "lb-group", lbMessageHandler1,
+                SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+            )
+            val sub2 = natsService.loadBalanceSubscribe(
+                "lb-publish", "lb-group", lbMessageHandler2,
+                SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+            )
+
+            repeat(10) {
+                natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
+            }
+            sub1.unsubscribe()
+            sub2.unsubscribe()
         }
     }