Cluster distributed lock service.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / atomix-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / atomix / service / AtomixBluePrintClusterService.kt
index 27921be..b5ec6dd 100644 (file)
@@ -18,10 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
 
 import io.atomix.cluster.ClusterMembershipEvent
 import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
 import kotlinx.coroutines.delay
 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.springframework.stereotype.Service
@@ -35,8 +37,6 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
 
     lateinit var atomix: Atomix
 
-    private var joined = false
-
     override suspend fun startCluster(clusterInfo: ClusterInfo) {
         log.info(
             "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
@@ -53,10 +53,10 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
         /** Listen for the member chaneg events */
         atomix.membershipService.addListener { membershipEvent ->
             when (membershipEvent.type()) {
-                ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
-                ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
-                ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
-                else -> log.info("***** Member event unknown")
+                ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
+                ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+                ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
+                else -> log.info("Member event unknown")
             }
         }
         atomix.start().join()
@@ -77,16 +77,16 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
             "ping",
             "ping from node(${clusterInfo.nodeId})"
         )
-        joined = true
     }
 
     override fun clusterJoined(): Boolean {
-        return joined
+        return atomix.isRunning
     }
 
     override suspend fun allMembers(): Set<ClusterMember> {
         check(::atomix.isInitialized) { "failed to start and join cluster" }
         check(atomix.isRunning) { "cluster is not running" }
+
         return atomix.membershipService.members.map {
             ClusterMember(
                 id = it.id().id(),
@@ -106,13 +106,51 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
     }
 
     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+        check(::atomix.isInitialized) { "failed to start and join cluster" }
         return AtomixLibUtils.distributedMapStore<T>(atomix, name)
     }
 
+    /** The DistributedLock is a distributed implementation of Java’s Lock.
+     * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
+     * determine ordering of multiple concurrent lock holders.
+     * DistributedLocks are designed to account for failures within the cluster.
+     * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
+     * the lock will be released and granted to the next waiting process.     *
+     */
+    override suspend fun clusterLock(name: String): ClusterLock {
+        check(::atomix.isInitialized) { "failed to start and join cluster" }
+        return ClusterLockImpl(atomix, name)
+    }
+
     override suspend fun shutDown(duration: Duration) {
-        val shutDownMilli = duration.toMillis()
-        log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
-        delay(shutDownMilli)
-        atomix.stop()
+        if (::atomix.isInitialized) {
+            val shutDownMilli = duration.toMillis()
+            log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+            delay(shutDownMilli)
+            atomix.stop()
+        }
+    }
+}
+
+open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
+
+    lateinit var distributedLock: DistributedLock
+
+    override suspend fun lock() {
+        distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+        distributedLock.lock()
+    }
+
+    override suspend fun tryLock(timeout: Long): Boolean {
+        distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+        return distributedLock.tryLock(Duration.ofMillis(timeout))
+    }
+
+    override suspend fun unLock() {
+        distributedLock.unlock()
+    }
+
+    override fun isLocked(): Boolean {
+        return distributedLock.isLocked
     }
 }