Cluster distributed lock service.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / atomix-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / atomix / AtomixBluePrintClusterServiceTest.kt
index 919d671..b257069 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.atomix
 
 import com.fasterxml.jackson.databind.JsonNode
+import io.atomix.core.Atomix
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
 import org.junit.Before
 import org.junit.Test
 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
@@ -31,6 +33,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
 
 class AtomixBluePrintClusterServiceTest {
     val log = logger(AtomixBluePrintClusterServiceTest::class)
@@ -60,21 +63,60 @@ class AtomixBluePrintClusterServiceTest {
                     atomixClusterService.atomix
                 }
             }
-            val atomix = deferred.awaitAll()
-            /** Test Distributed store creation */
-            repeat(2) { storeId ->
-                val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
-                assertNotNull(store, "failed to get store")
-                val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
-                store1.addListener {
-                    log.info("Received map event : $it")
-                }
-                repeat(10) {
-                    store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
-                }
-                delay(100)
-                store.close()
+            val atomixs = deferred.awaitAll()
+            testDistributedStore(atomixs)
+            testDistributedLock(atomixs)
+        }
+    }
+
+    private suspend fun testDistributedStore(atomix: List<Atomix>) {
+        /** Test Distributed store creation */
+        repeat(2) { storeId ->
+            val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+            assertNotNull(store, "failed to get store")
+            val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+            store1.addListener {
+                log.info("Received map event : $it")
+            }
+            repeat(10) {
+                store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
             }
+            delay(100)
+            store.close()
+        }
+    }
+
+    private suspend fun testDistributedLock(atomix: List<Atomix>) {
+        val lockName = "sample-lock"
+        withContext(Dispatchers.IO) {
+            val deferred = async {
+                executeLock(atomix.get(0), "first", lockName)
+            }
+            val deferred2 = async {
+                executeLock(atomix.get(1), "second", lockName)
+            }
+            val deferred3 = async {
+                executeLock(atomix.get(1), "third", lockName)
+            }
+            deferred.start()
+            deferred2.start()
+            deferred3.start()
+        }
+    }
+
+    private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+        log.info("initialising $lockId lock...")
+        val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+        assertNotNull(distributedLock, "failed to create distributed $lockId lock")
+        distributedLock.lock()
+        assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+        try {
+            log.info("locked $lockId process for 5mSec")
+            delay(5)
+        } finally {
+            distributedLock.unlock()
+            log.info("$lockId lock released")
         }
+        distributedLock.close()
     }
 }