Fix hazelcast issues
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazelcastClusterServiceTest.kt
@@ -20,13 +20,17 @@ import com.fasterxml.jackson.databind.JsonNode
 import com.hazelcast.client.config.YamlClientConfigBuilder
 import com.hazelcast.cluster.Member
 import com.hazelcast.config.FileSystemYamlConfig
+import com.hazelcast.instance.impl.HazelcastInstanceFactory
 import com.hazelcast.map.IMap
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.delay
+import kotlinx.coroutines.newSingleThreadContext
 import kotlinx.coroutines.runBlocking
 import kotlinx.coroutines.withContext
+import org.junit.After
+import org.junit.Before
 import org.junit.Test
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
@@ -35,16 +39,21 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 import java.io.Serializable
-import java.time.Duration
 import java.util.Properties
 import kotlin.test.assertEquals
 import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
 
-class HazlecastClusterServiceTest {
-    private val log = logger(HazlecastClusterServiceTest::class)
+class HazelcastClusterServiceTest {
+    private val log = logger(HazelcastClusterServiceTest::class)
     private val clusterSize = 3
 
+    @Before
+    @After
+    fun killAllHazelcastInstances() {
+        HazelcastInstanceFactory.terminateAll()
+    }
+
     @Test
     fun testClientFileSystemYamlConfig() {
         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
@@ -74,33 +83,23 @@ class HazlecastClusterServiceTest {
     fun testClusterJoin() {
         runBlocking {
             val bluePrintClusterServiceOne =
-                createCluster(arrayListOf(5679, 5680, 5681)).toMutableList()
-            // delay(1000)
-            // Join as Hazlecast Management Node
-            // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true)
-            // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false)
-            // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+                createCluster(arrayListOf(1, 2, 3)).toMutableList()
             printReachableMembers(bluePrintClusterServiceOne)
             testDistributedStore(bluePrintClusterServiceOne)
             testDistributedLock(bluePrintClusterServiceOne)
-
-            // executeScheduler(bluePrintClusterServiceOne[0])
-            // delay(1000)
-            // Shutdown
-            shutdown(bluePrintClusterServiceOne)
         }
     }
 
     private suspend fun createCluster(
-        ports: List<Int>,
+        ids: List<Int>,
         joinAsClient: Boolean? = false
     ): List<BluePrintClusterService> {
 
         return withContext(Dispatchers.Default) {
-            val deferred = ports.map { port ->
+            val deferred = ids.map { id ->
                 async(Dispatchers.IO) {
-                    val nodeId = "node-$port"
-                    log.info("********** Starting node($nodeId) on port($port)")
+                    val nodeId = "node-$id"
+                    log.info("********** Starting ($nodeId)")
                     val properties = Properties()
                     properties["hazelcast.logging.type"] = "slf4j"
                     val clusterInfo =
@@ -117,21 +116,15 @@ class HazlecastClusterServiceTest {
                                 properties = properties
                             )
                         }
-                    val hazlecastClusterService = HazlecastClusterService()
-                    hazlecastClusterService.startCluster(clusterInfo)
-                    hazlecastClusterService
+                    val hazelcastClusterService = HazelcastClusterService()
+                    hazelcastClusterService.startCluster(clusterInfo)
+                    hazelcastClusterService
                 }
             }
             deferred.awaitAll()
         }
     }
 
-    private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) {
-        bluePrintClusterServices.forEach { bluePrintClusterService ->
-            bluePrintClusterService.shutDown(Duration.ofMillis(10))
-        }
-    }
-
     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
         /** Test Distributed store creation */
         repeat(2) { storeId ->
@@ -159,13 +152,25 @@ class HazlecastClusterServiceTest {
         val lockName = "sample-lock"
         withContext(Dispatchers.IO) {
             val deferred = async {
-                executeLock(bluePrintClusterServices[0], "first", lockName)
+                newSingleThreadContext("first").use {
+                    withContext(it) {
+                        executeLock(bluePrintClusterServices[0], "first", lockName)
+                    }
+                }
             }
             val deferred2 = async {
-                executeLock(bluePrintClusterServices[1], "second", lockName)
+                newSingleThreadContext("second").use {
+                    withContext(it) {
+                        executeLock(bluePrintClusterServices[1], "second", lockName)
+                    }
+                }
             }
             val deferred3 = async {
-                executeLock(bluePrintClusterServices[2], "third", lockName)
+                newSingleThreadContext("third").use {
+                    withContext(it) {
+                        executeLock(bluePrintClusterServices[2], "third", lockName)
+                    }
+                }
             }
             deferred.start()
             deferred2.start()
@@ -195,12 +200,12 @@ class HazlecastClusterServiceTest {
 
     private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
         log.info("initialising ...")
-        val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
+        val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
 
         val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
         assertEquals(3, memberNameMap.size, "failed to match member size")
         memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
-        val scheduler = hazlecastClusterService.clusterScheduler("cleanup")
+        val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
         // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
         // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
         // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
@@ -209,15 +214,15 @@ class HazlecastClusterServiceTest {
 
     private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
         bluePrintClusterServices.forEach { bluePrintClusterService ->
-            val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
-            val hazelcast = hazlecastClusterService.hazelcast
+            val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
+            val hazelcast = hazelcastClusterService.hazelcast
             val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
-            val master = hazlecastClusterService.masterMember("system").memberAddress
-            val members = hazlecastClusterService.allMembers().map { it.memberAddress }
+            val master = hazelcastClusterService.masterMember("system").memberAddress
+            val members = hazelcastClusterService.allMembers().map { it.memberAddress }
             log.info("Cluster Members for($self): master($master) Members($members)")
         }
 
-        val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56")
+        val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
         assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
         log.info("Cluster applicationMembers ($applicationMembers)")
     }