Cluster distributed lock service. 35/99835/2
authorBrinda Santh <bs2796@att.com>
Thu, 19 Dec 2019 21:11:31 +0000 (16:11 -0500)
committerKAPIL SINGAL <ks220y@att.com>
Mon, 23 Dec 2019 18:00:26 +0000 (18:00 +0000)
Included and fixed clustered env properties and utils.

Fixed docker compose instance sequence numbers.

Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Ie28935ae7cb3de8c77cd7110993304eb49799b6c

ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
ms/blueprintsprocessor/application/src/main/docker/startService.sh
ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt

index f43f19c..a37089f 100644 (file)
@@ -20,20 +20,20 @@ services:
     image: nats-streaming:latest
     container_name: nats
     hostname: nats
-    command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-1 --cluster_node_id nats-1"
+    command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-0 --cluster_node_id nats-0"
     networks:
       - cds-network
     ports:
       - "8222:8222"
       - "4222:4222"
     restart: always
-  cds-controller-1:
+  cds-controller-0:
     depends_on:
       - db
       - nats
     image: onap/ccsdk-blueprintsprocessor:latest
-    container_name: cds-controller-1
-    hostname: cds-controller-1
+    container_name: cds-controller-0
+    hostname: cds-controller-0
     networks:
       - cds-network
     ports:
@@ -49,9 +49,10 @@ services:
         source: ./config
     environment:
       # Same as hostname and container name
+      CLUSTER_ENABLED: "true"
       CLUSTER_ID: cds-cluster
-      CLUSTER_NODE_ID: cds-controller-1
-      CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+      CLUSTER_NODE_ID: cds-controller-0
+      CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
       CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
       #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
       NATS_HOSTS: nats://nats:4222
@@ -60,13 +61,13 @@ services:
       APP_CONFIG_HOME: /opt/app/onap/config
       STICKYSELECTORKEY:
       ENVCONTEXT: dev
-  resource-resolution-1:
+  resource-resolution-0:
     depends_on:
       - db
       - nats
     image: onap/ccsdk-blueprintsprocessor:latest
-    container_name: resource-resolution-1
-    hostname: resource-resolution-1
+    container_name: resource-resolution-0
+    hostname: resource-resolution-0
     networks:
       - cds-network
     ports:
@@ -81,9 +82,10 @@ services:
         type: bind
         source: ./config
     environment:
+      CLUSTER_ENABLED: "true"
       CLUSTER_ID: cds-cluster
-      CLUSTER_NODE_ID: resource-resolution-1
-      CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+      CLUSTER_NODE_ID: resource-resolution-0
+      CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
       CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
       #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
       NATS_HOSTS: nats://nats:4222
index f5967dc..f516a3c 100644 (file)
@@ -18,10 +18,4 @@ exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sour
 -Djava.security.egd=file:/dev/./urandom \
 -DAPPNAME=${APPLICATIONNAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
 -Dspring.config.location=${APP_CONFIG_HOME}/ \
--DCLUSTER_ID=${CLUSTER_ID} \
--DCLUSTER_NODE_ID=${CLUSTER_NODE_ID} \
--DCLUSTER_NODE_ADDRESS=${CLUSTER_NODE_ID} \
--DCLUSTER_MEMBERS=${CLUSTER_MEMBERS} \
--DCLUSTER_STORAGE_PATH=${CLUSTER_STORAGE_PATH} \
--DCLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} \
 org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt
index b78ebf6..4c9314e 100644 (file)
@@ -22,6 +22,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
 import org.springframework.boot.context.event.ApplicationReadyEvent
 import org.springframework.context.event.EventListener
 import org.springframework.stereotype.Component
@@ -52,7 +54,7 @@ import javax.annotation.PreDestroy
  *      CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
  *      CLUSTER_CONFIG_FILE:  /opt/app/onap/config/atomix/atomix-multicast.conf
  * 4. Cluster will be enabled only all the above properties present in the environments.
- * if CLUSTER_ID is present, then it will try to create cluster.
+ * if CLUSTER_ENABLED is present, then it will try to create cluster.
  */
 @Component
 open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePrintClusterService) {
@@ -61,25 +63,22 @@ open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePr
 
     @EventListener(ApplicationReadyEvent::class)
     fun startAndJoinCluster() = runBlocking {
-        val clusterId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID)
 
-        if (!clusterId.isNullOrEmpty()) {
+        if (BluePrintConstants.CLUSTER_ENABLED) {
 
-            val nodeId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
-                ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ID}")
+            val clusterId = ClusterUtils.clusterId()
+            val nodeId = ClusterUtils.clusterNodeId()
+            val nodeAddress = ClusterUtils.clusterNodeAddress()
 
-            val nodeAddress = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
-                ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS}")
-
-            val clusterMembers = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
+            val clusterMembers = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
                 ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_MEMBERS}")
 
-            val clusterMemberList = clusterMembers.split(",").map { it.trim() }.toList()
+            val clusterMemberList = clusterMembers.splitCommaAsList()
 
-            val clusterStorage = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
+            val clusterStorage = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
                 ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH}")
 
-            val clusterConfigFile = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
+            val clusterConfigFile = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
 
             val clusterInfo = ClusterInfo(
                 id = clusterId, nodeId = nodeId,
@@ -89,10 +88,7 @@ open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePr
             )
             bluePrintClusterService.startCluster(clusterInfo)
         } else {
-            log.info(
-                "Cluster is disabled, to enable cluster set the environment " +
-                    "properties[CLUSTER_ID,CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, CLUSTER_MEMBERS,CLUSTER_CONFIG_FILE]"
-            )
+            log.info("Cluster is disabled, to enable cluster set the environment CLUSTER_* properties.")
         }
     }
 
index caf0631..571f0a1 100644 (file)
@@ -27,7 +27,7 @@ object BluePrintConstants {
     val APP_NAME = System.getProperty("APPLICATION_NAME")
         ?: System.getProperty("APP_NAME")
         ?: System.getProperty("APPNAME")
-        ?: "cds-controller-default"
+        ?: "cds-controller"
 
     const val DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
 
@@ -219,6 +219,7 @@ object BluePrintConstants {
     val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean()
 
     /** Cluster Properties */
+    val CLUSTER_ENABLED = (System.getenv("CLUSTER_ENABLED") ?: "false").toBoolean()
     const val PROPERTY_CLUSTER_ID = "CLUSTER_ID"
     const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID"
     const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS"
index d5ffe6b..b52cd71 100644 (file)
@@ -28,10 +28,15 @@ object ClusterUtils {
     }
 
     fun clusterId(): String {
-        return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
+        return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
     }
 
     fun clusterNodeId(): String {
-        return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller"
+        return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller-0"
+    }
+
+    fun clusterNodeAddress(): String {
+        return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
+            ?: clusterNodeId()
     }
 }
index 27921be..b5ec6dd 100644 (file)
@@ -18,10 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
 
 import io.atomix.cluster.ClusterMembershipEvent
 import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
 import kotlinx.coroutines.delay
 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.springframework.stereotype.Service
@@ -35,8 +37,6 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
 
     lateinit var atomix: Atomix
 
-    private var joined = false
-
     override suspend fun startCluster(clusterInfo: ClusterInfo) {
         log.info(
             "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
@@ -53,10 +53,10 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
         /** Listen for the member chaneg events */
         atomix.membershipService.addListener { membershipEvent ->
             when (membershipEvent.type()) {
-                ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
-                ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
-                ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
-                else -> log.info("***** Member event unknown")
+                ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
+                ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+                ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
+                else -> log.info("Member event unknown")
             }
         }
         atomix.start().join()
@@ -77,16 +77,16 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
             "ping",
             "ping from node(${clusterInfo.nodeId})"
         )
-        joined = true
     }
 
     override fun clusterJoined(): Boolean {
-        return joined
+        return atomix.isRunning
     }
 
     override suspend fun allMembers(): Set<ClusterMember> {
         check(::atomix.isInitialized) { "failed to start and join cluster" }
         check(atomix.isRunning) { "cluster is not running" }
+
         return atomix.membershipService.members.map {
             ClusterMember(
                 id = it.id().id(),
@@ -106,13 +106,51 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
     }
 
     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+        check(::atomix.isInitialized) { "failed to start and join cluster" }
         return AtomixLibUtils.distributedMapStore<T>(atomix, name)
     }
 
+    /** The DistributedLock is a distributed implementation of Java’s Lock.
+     * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
+     * determine ordering of multiple concurrent lock holders.
+     * DistributedLocks are designed to account for failures within the cluster.
+     * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
+     * the lock will be released and granted to the next waiting process.     *
+     */
+    override suspend fun clusterLock(name: String): ClusterLock {
+        check(::atomix.isInitialized) { "failed to start and join cluster" }
+        return ClusterLockImpl(atomix, name)
+    }
+
     override suspend fun shutDown(duration: Duration) {
-        val shutDownMilli = duration.toMillis()
-        log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
-        delay(shutDownMilli)
-        atomix.stop()
+        if (::atomix.isInitialized) {
+            val shutDownMilli = duration.toMillis()
+            log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+            delay(shutDownMilli)
+            atomix.stop()
+        }
+    }
+}
+
+open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
+
+    lateinit var distributedLock: DistributedLock
+
+    override suspend fun lock() {
+        distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+        distributedLock.lock()
+    }
+
+    override suspend fun tryLock(timeout: Long): Boolean {
+        distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+        return distributedLock.tryLock(Duration.ofMillis(timeout))
+    }
+
+    override suspend fun unLock() {
+        distributedLock.unlock()
+    }
+
+    override fun isLocked(): Boolean {
+        return distributedLock.isLocked
     }
 }
index 6e726a1..a2a0d39 100644 (file)
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
 import com.fasterxml.jackson.databind.node.NullNode
 import com.fasterxml.jackson.databind.node.ObjectNode
 import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
 import io.atomix.core.map.DistributedMap
 import io.atomix.protocols.backup.MultiPrimaryProtocol
 import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
@@ -29,28 +30,34 @@ import io.atomix.protocols.raft.partition.RaftPartitionGroup
 import io.atomix.utils.net.Address
 import org.jsoup.nodes.TextNode
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 
 object AtomixLibUtils {
+    private val log = logger(AtomixLibUtils::class)
 
     fun configAtomix(filePath: String): Atomix {
         val configFile = normalizedFile(filePath)
         return Atomix.builder(configFile.absolutePath).build()
     }
 
-    fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix {
+    fun defaultMulticastAtomix(
+        clusterInfo: ClusterInfo,
+        raftPartitions: Int = 1,
+        primaryBackupPartitions: Int = 32
+    ): Atomix {
 
         val nodeId = clusterInfo.nodeId
 
         val raftPartitionGroup = RaftPartitionGroup.builder("system")
-            .withNumPartitions(7)
+            .withNumPartitions(raftPartitions)
             .withMembers(clusterInfo.clusterMembers)
             .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
             .build()
 
         val primaryBackupGroup =
             PrimaryBackupPartitionGroup.builder("data")
-                .withNumPartitions(31)
+                .withNumPartitions(primaryBackupPartitions)
                 .build()
 
         return Atomix.builder()
@@ -62,11 +69,11 @@ object AtomixLibUtils {
             .build()
     }
 
-    fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> {
+    fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
         check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
 
         val protocol = MultiPrimaryProtocol.builder()
-            .withBackups(2)
+            .withBackups(numBackups)
             .build()
 
         return atomix.mapBuilder<String, T>(storeName)
@@ -79,4 +86,17 @@ object AtomixLibUtils {
             )
             .build()
     }
+
+    fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
+        check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
+
+        val protocol = MultiPrimaryProtocol.builder()
+            .withBackups(numBackups)
+            .build()
+
+        val lock = atomix.lockBuilder(lockName)
+            .withProtocol(protocol)
+            .build()
+        return lock
+    }
 }
index 919d671..b257069 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.atomix
 
 import com.fasterxml.jackson.databind.JsonNode
+import io.atomix.core.Atomix
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
 import org.junit.Before
 import org.junit.Test
 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
@@ -31,6 +33,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
 
 class AtomixBluePrintClusterServiceTest {
     val log = logger(AtomixBluePrintClusterServiceTest::class)
@@ -60,21 +63,60 @@ class AtomixBluePrintClusterServiceTest {
                     atomixClusterService.atomix
                 }
             }
-            val atomix = deferred.awaitAll()
-            /** Test Distributed store creation */
-            repeat(2) { storeId ->
-                val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
-                assertNotNull(store, "failed to get store")
-                val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
-                store1.addListener {
-                    log.info("Received map event : $it")
-                }
-                repeat(10) {
-                    store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
-                }
-                delay(100)
-                store.close()
+            val atomixs = deferred.awaitAll()
+            testDistributedStore(atomixs)
+            testDistributedLock(atomixs)
+        }
+    }
+
+    private suspend fun testDistributedStore(atomix: List<Atomix>) {
+        /** Test Distributed store creation */
+        repeat(2) { storeId ->
+            val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+            assertNotNull(store, "failed to get store")
+            val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+            store1.addListener {
+                log.info("Received map event : $it")
+            }
+            repeat(10) {
+                store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
             }
+            delay(100)
+            store.close()
+        }
+    }
+
+    private suspend fun testDistributedLock(atomix: List<Atomix>) {
+        val lockName = "sample-lock"
+        withContext(Dispatchers.IO) {
+            val deferred = async {
+                executeLock(atomix.get(0), "first", lockName)
+            }
+            val deferred2 = async {
+                executeLock(atomix.get(1), "second", lockName)
+            }
+            val deferred3 = async {
+                executeLock(atomix.get(1), "third", lockName)
+            }
+            deferred.start()
+            deferred2.start()
+            deferred3.start()
+        }
+    }
+
+    private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+        log.info("initialising $lockId lock...")
+        val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+        assertNotNull(distributedLock, "failed to create distributed $lockId lock")
+        distributedLock.lock()
+        assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+        try {
+            log.info("locked $lockId process for 5mSec")
+            delay(5)
+        } finally {
+            distributedLock.unlock()
+            log.info("$lockId lock released")
         }
+        distributedLock.close()
     }
 }
index 976f9f5..549be64 100644 (file)
@@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
 import kotlin.test.assertNotNull
 
@@ -106,6 +107,7 @@ class BluePrintNatsServiceTest {
 
             testMultiPublish(natsService)
             testLoadBalance(natsService)
+            testLimitSubscription(natsService)
             testRequestReply(natsService)
             testMultiRequestReply(natsService)
             delay(1000)
@@ -137,12 +139,49 @@ class BluePrintNatsServiceTest {
             val lbMessageHandler2 =
                 MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
 
-            natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
-            natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+            val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+            val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
 
             repeat(5) {
                 natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
             }
+            sub1.unsubscribe()
+            sub2.unsubscribe()
+        }
+    }
+
+    private fun testLimitSubscription(natsService: BluePrintNatsService) {
+        runBlocking {
+            /** Load balance Publish Message Test **/
+            val lbMessageHandler1 =
+                MessageHandler { message ->
+                    runBlocking {
+                        println("LB Publish Message Handler 1: ${message.strData()}")
+                        message.ack()
+                    }
+                }
+            val lbMessageHandler2 =
+                MessageHandler { message ->
+                    runBlocking {
+                        println("LB Publish Message Handler 2: ${message.strData()}")
+                        message.ack()
+                    }
+                }
+
+            val sub1 = natsService.loadBalanceSubscribe(
+                "lb-publish", "lb-group", lbMessageHandler1,
+                SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+            )
+            val sub2 = natsService.loadBalanceSubscribe(
+                "lb-publish", "lb-group", lbMessageHandler2,
+                SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+            )
+
+            repeat(10) {
+                natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
+            }
+            sub1.unsubscribe()
+            sub2.unsubscribe()
         }
     }
 
index bbaa427..6fd4436 100644 (file)
@@ -35,6 +35,9 @@ interface BluePrintClusterService {
     /** Create and get or get the distributed data map store with [name] */
     suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
 
+    /** Create and get the distributed lock with [name] */
+    suspend fun clusterLock(name: String): ClusterLock
+
     /** Shut down the cluster with [duration] */
     suspend fun shutDown(duration: Duration)
 }
@@ -48,4 +51,11 @@ data class ClusterInfo(
     var storagePath: String
 )
 
-data class ClusterMember(val id: String, val memberAddress: String?)
+data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null)
+
+interface ClusterLock {
+    suspend fun lock()
+    suspend fun tryLock(timeout: Long): Boolean
+    suspend fun unLock()
+    fun isLocked(): Boolean
+}