Fix hazelcast issues 56/109456/1
authorJozsef Csongvai <jozsef.csongvai@bell.ca>
Tue, 23 Jun 2020 12:48:30 +0000 (08:48 -0400)
committerJozsef Csongvai <jozsef.csongvai@bell.ca>
Tue, 23 Jun 2020 13:01:35 +0000 (09:01 -0400)
- confined lock tests to individual threads to ensure correct unlocking
- removed silent failure in clusterlock.unlock function when unlock is
  called by a thread that doesnt own the lock.
- added isLockedByCurrentThread method to ClusterLock interface
- disabled multicast discovery, tcp-ip should be more stable for tests
- fix Hazlecast typo

Issue-ID: CCSDK-2429
Signed-off-by: Jozsef Csongvai <jozsef.csongvai@bell.ca>
Change-Id: Idfe723fff04fcd9c48510cf429eb15b33662c49d

ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt [moved from ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt with 95% similarity]
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt [moved from ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt with 98% similarity]
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt [moved from ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt with 79% similarity]
ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml

index 81fc0d7..0a58857 100644 (file)
@@ -29,10 +29,10 @@ import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
 /**
- * Exposed Dependency Service by this Hazlecast Lib Module
+ * Exposed Dependency Service by this Hazelcast Lib Module
  */
 fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
-    instance(HazlecastClusterService::class)
+    instance(HazelcastClusterService::class)
 
 /** Optional Cluster Service, returns only if Cluster is enabled */
 fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? {
@@ -45,9 +45,9 @@ import java.time.Duration
 import java.util.concurrent.TimeUnit
 
 @Service
-open class HazlecastClusterService : BluePrintClusterService {
+open class HazelcastClusterService : BluePrintClusterService {
 
-    private val log = logger(HazlecastClusterService::class)
+    private val log = logger(HazelcastClusterService::class)
     lateinit var hazelcast: HazelcastInstance
     lateinit var cpSubsystemManagementService: CPSubsystemManagementService
     var joinedClient = false
@@ -179,14 +179,14 @@ open class HazlecastClusterService : BluePrintClusterService {
     override suspend fun shutDown(duration: Duration) {
         if (::hazelcast.isInitialized && clusterJoined()) {
             delay(duration.toMillis())
-            HazlecastClusterUtils.terminate(hazelcast)
+            HazelcastClusterUtils.terminate(hazelcast)
         }
     }
 
     /** Utils */
     suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
         if (!joinedClient && !joinedLite) {
-            HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance)
+            HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
         }
     }
 
@@ -243,17 +243,18 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val
     }
 
     override suspend fun unLock() {
-        // Added condition to avoid failures like - "Current thread is not owner of the lock!"
-        if (distributedLock.isLockedByCurrentThread) {
-            distributedLock.unlock()
-            log.trace("Cluster unlock(${name()}) successfully..")
-        }
+        distributedLock.unlock()
+        log.trace("Cluster unlock(${name()}) successfully..")
     }
 
     override fun isLocked(): Boolean {
         return distributedLock.isLocked
     }
 
+    override fun isLockedByCurrentThread(): Boolean {
+        return distributedLock.isLockedByCurrentThread
+    }
+
     override suspend fun fenceLock(): String {
         val fence = distributedLock.lockAndGetFence()
         log.trace("Cluster lock($name) fence($fence) created..")
@@ -20,13 +20,17 @@ import com.fasterxml.jackson.databind.JsonNode
 import com.hazelcast.client.config.YamlClientConfigBuilder
 import com.hazelcast.cluster.Member
 import com.hazelcast.config.FileSystemYamlConfig
+import com.hazelcast.instance.impl.HazelcastInstanceFactory
 import com.hazelcast.map.IMap
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.delay
+import kotlinx.coroutines.newSingleThreadContext
 import kotlinx.coroutines.runBlocking
 import kotlinx.coroutines.withContext
+import org.junit.After
+import org.junit.Before
 import org.junit.Test
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
@@ -35,16 +39,21 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 import java.io.Serializable
-import java.time.Duration
 import java.util.Properties
 import kotlin.test.assertEquals
 import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
 
-class HazlecastClusterServiceTest {
-    private val log = logger(HazlecastClusterServiceTest::class)
+class HazelcastClusterServiceTest {
+    private val log = logger(HazelcastClusterServiceTest::class)
     private val clusterSize = 3
 
+    @Before
+    @After
+    fun killAllHazelcastInstances() {
+        HazelcastInstanceFactory.terminateAll()
+    }
+
     @Test
     fun testClientFileSystemYamlConfig() {
         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
@@ -74,33 +83,23 @@ class HazlecastClusterServiceTest {
     fun testClusterJoin() {
         runBlocking {
             val bluePrintClusterServiceOne =
-                createCluster(arrayListOf(5679, 5680, 5681)).toMutableList()
-            // delay(1000)
-            // Join as Hazlecast Management Node
-            // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true)
-            // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false)
-            // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+                createCluster(arrayListOf(1, 2, 3)).toMutableList()
             printReachableMembers(bluePrintClusterServiceOne)
             testDistributedStore(bluePrintClusterServiceOne)
             testDistributedLock(bluePrintClusterServiceOne)
-
-            // executeScheduler(bluePrintClusterServiceOne[0])
-            // delay(1000)
-            // Shutdown
-            shutdown(bluePrintClusterServiceOne)
         }
     }
 
     private suspend fun createCluster(
-        ports: List<Int>,
+        ids: List<Int>,
         joinAsClient: Boolean? = false
     ): List<BluePrintClusterService> {
 
         return withContext(Dispatchers.Default) {
-            val deferred = ports.map { port ->
+            val deferred = ids.map { id ->
                 async(Dispatchers.IO) {
-                    val nodeId = "node-$port"
-                    log.info("********** Starting node($nodeId) on port($port)")
+                    val nodeId = "node-$id"
+                    log.info("********** Starting ($nodeId)")
                     val properties = Properties()
                     properties["hazelcast.logging.type"] = "slf4j"
                     val clusterInfo =
@@ -117,21 +116,15 @@ class HazlecastClusterServiceTest {
                                 properties = properties
                             )
                         }
-                    val hazlecastClusterService = HazlecastClusterService()
-                    hazlecastClusterService.startCluster(clusterInfo)
-                    hazlecastClusterService
+                    val hazelcastClusterService = HazelcastClusterService()
+                    hazelcastClusterService.startCluster(clusterInfo)
+                    hazelcastClusterService
                 }
             }
             deferred.awaitAll()
         }
     }
 
-    private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) {
-        bluePrintClusterServices.forEach { bluePrintClusterService ->
-            bluePrintClusterService.shutDown(Duration.ofMillis(10))
-        }
-    }
-
     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
         /** Test Distributed store creation */
         repeat(2) { storeId ->
@@ -159,13 +152,25 @@ class HazlecastClusterServiceTest {
         val lockName = "sample-lock"
         withContext(Dispatchers.IO) {
             val deferred = async {
-                executeLock(bluePrintClusterServices[0], "first", lockName)
+                newSingleThreadContext("first").use {
+                    withContext(it) {
+                        executeLock(bluePrintClusterServices[0], "first", lockName)
+                    }
+                }
             }
             val deferred2 = async {
-                executeLock(bluePrintClusterServices[1], "second", lockName)
+                newSingleThreadContext("second").use {
+                    withContext(it) {
+                        executeLock(bluePrintClusterServices[1], "second", lockName)
+                    }
+                }
             }
             val deferred3 = async {
-                executeLock(bluePrintClusterServices[2], "third", lockName)
+                newSingleThreadContext("third").use {
+                    withContext(it) {
+                        executeLock(bluePrintClusterServices[2], "third", lockName)
+                    }
+                }
             }
             deferred.start()
             deferred2.start()
@@ -195,12 +200,12 @@ class HazlecastClusterServiceTest {
 
     private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
         log.info("initialising ...")
-        val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
+        val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
 
         val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
         assertEquals(3, memberNameMap.size, "failed to match member size")
         memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
-        val scheduler = hazlecastClusterService.clusterScheduler("cleanup")
+        val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
         // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
         // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
         // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
@@ -209,15 +214,15 @@ class HazlecastClusterServiceTest {
 
     private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
         bluePrintClusterServices.forEach { bluePrintClusterService ->
-            val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
-            val hazelcast = hazlecastClusterService.hazelcast
+            val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
+            val hazelcast = hazelcastClusterService.hazelcast
             val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
-            val master = hazlecastClusterService.masterMember("system").memberAddress
-            val members = hazlecastClusterService.allMembers().map { it.memberAddress }
+            val master = hazelcastClusterService.masterMember("system").memberAddress
+            val members = hazelcastClusterService.allMembers().map { it.memberAddress }
             log.info("Cluster Members for($self): master($master) Members($members)")
         }
 
-        val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56")
+        val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
         assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
         log.info("Cluster applicationMembers ($applicationMembers)")
     }
index de6047a..b4dc345 100644 (file)
@@ -7,10 +7,15 @@ hazelcast:
     session-time-to-live-seconds: 60
     session-heartbeat-interval-seconds: 5
     missing-cp-member-auto-removal-seconds: 120
+  metrics:
+    enabled: false
   network:
           join:
-            multicast:
+            tcp-ip:
               enabled: true
+              interface: 127.0.0.1
+            multicast:
+              enabled: false
               # Specify 224.0.0.1 instead of default 224.2.2.3 since there's some issue
               # on macOs with docker installed and multicast address different than 224.0.0.1
               # https://stackoverflow.com/questions/46341715/hazelcast-multicast-does-not-work-because-of-vboxnet-which-is-used-by-docker-mac