Cluster CP member promotion as default. 08/101708/4
authorBrinda Santh <bs2796@att.com>
Thu, 13 Feb 2020 21:50:18 +0000 (16:50 -0500)
committerKAPIL SINGAL <ks220y@att.com>
Fri, 14 Feb 2020 16:21:39 +0000 (16:21 +0000)
Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I4df4b442c020fbf9da294a9b35739ca3acb8ee02

ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
ms/blueprintsprocessor/application/src/main/resources/hazelcast/hazelcast.yaml
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml
ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml
ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml
ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml
ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml

index 16cb5d6..f7296d4 100644 (file)
@@ -88,6 +88,6 @@ open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePr
 
     @PreDestroy
     fun shutDown() = runBlocking {
-        bluePrintClusterService.shutDown(Duration.ofSeconds(1))
+        bluePrintClusterService.shutDown(Duration.ofMillis(1))
     }
 }
index bacbe2a..5c74682 100644 (file)
@@ -6,6 +6,9 @@ hazelcast:
   cp-subsystem:
     cp-member-count: 3
     group-size: 3
+    session-time-to-live-seconds: 60
+    session-heartbeat-interval-seconds: 5
+    missing-cp-member-auto-removal-seconds: 120
 #  network:
 #    join:
 #      multicast:
index 83a04d6..0964145 100644 (file)
@@ -27,6 +27,7 @@ import com.hazelcast.config.FileSystemYamlConfig
 import com.hazelcast.config.MemberAttributeConfig
 import com.hazelcast.core.Hazelcast
 import com.hazelcast.core.HazelcastInstance
+import com.hazelcast.cp.CPSubsystemManagementService
 import com.hazelcast.cp.lock.FencedLock
 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
 import kotlinx.coroutines.delay
@@ -48,6 +49,7 @@ open class HazlecastClusterService : BluePrintClusterService {
 
     private val log = logger(HazlecastClusterService::class)
     lateinit var hazelcast: HazelcastInstance
+    lateinit var cpSubsystemManagementService: CPSubsystemManagementService
     var joinedClient = false
     var joinedLite = false
 
@@ -57,7 +59,10 @@ open class HazlecastClusterService : BluePrintClusterService {
             when (configuration) {
                 is Config -> {
                     joinedLite = configuration.isLiteMember
-                    Hazelcast.newHazelcastInstance(configuration)
+                    val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
+                    /** Promote as CP Member */
+                    promoteAsCPMember(hazelcastInstance)
+                    hazelcastInstance
                 }
                 is ClientConfig -> {
                     joinedClient = true
@@ -99,7 +104,10 @@ open class HazlecastClusterService : BluePrintClusterService {
                         hazelcastServerConfiguration.properties = configuration.properties
                         hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
                         joinedLite = hazelcastServerConfiguration.isLiteMember
-                        Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
+                        val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
+                        /** Promote as CP Member */
+                        promoteAsCPMember(hazelcastInstance)
+                        hazelcastInstance
                     }
                 }
                 else -> {
@@ -169,11 +177,17 @@ open class HazlecastClusterService : BluePrintClusterService {
     override suspend fun shutDown(duration: Duration) {
         if (::hazelcast.isInitialized && clusterJoined()) {
             delay(duration.toMillis())
-            hazelcast.lifecycleService.terminate()
+            HazlecastClusterUtils.terminate(hazelcast)
         }
     }
 
     /** Utils */
+    suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
+        if (!joinedClient && !joinedLite) {
+            HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance)
+        }
+    }
+
     suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
         check(!isClient()) { "not supported for cluster client members." }
@@ -198,11 +212,11 @@ open class BlueprintsClusterMembershipListener(val hazlecastClusterService: Hazl
     private val log = logger(BlueprintsClusterMembershipListener::class)
 
     override fun memberRemoved(membershipEvent: MembershipEvent) {
-        log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent")
+        log.info("MembershipEvent: $membershipEvent")
     }
 
     override fun memberAdded(membershipEvent: MembershipEvent) {
-        log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent")
+        log.info("MembershipEvent: $membershipEvent")
     }
 }
 
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt
new file mode 100644 (file)
index 0000000..70970f6
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+import com.hazelcast.core.HazelcastInstance
+import com.hazelcast.cp.CPGroup
+import com.hazelcast.cp.CPMember
+import com.hazelcast.cp.CPSubsystemManagementService
+import com.hazelcast.instance.impl.HazelcastInstanceProxy
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+object HazlecastClusterUtils {
+
+    private val log = logger(HazlecastClusterUtils::class)
+
+    /** Promote [hazelcastInstance] member to CP Member */
+    fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
+        when (hazelcastInstance) {
+            is HazelcastInstanceProxy -> {
+                val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance)
+                cpSubsystemManagementService.promoteToCPMember()
+                    .toCompletableFuture().get()
+                log.info("Promoted as CP member(${hazelcastInstance.cluster.localMember})")
+                val clusterCPMembers = clusterCPMembers(hazelcastInstance)
+                log.info("CP Members(${clusterCPMembers.size}): $clusterCPMembers")
+                val cpGroupMembers = cpGroupMembers(hazelcastInstance)
+                log.info("CP Group Members(${cpGroupMembers.size}): $cpGroupMembers")
+            }
+            else -> log.debug("Client instance not eligible for CP Member promotion")
+        }
+    }
+
+    /** Terminate [hazelcastInstance] member */
+    fun terminate(hazelcastInstance: HazelcastInstance) {
+        log.info("Terminating Member : ${hazelcastInstance.cluster.localMember}")
+        hazelcastInstance.lifecycleService.terminate()
+    }
+
+    /** Remove [hazelcastInstance] member from cluster CP Member List*/
+    fun removeFromCPMember(hazelcastInstance: HazelcastInstance) {
+        // check CP Member, then remove */
+        val localCPMemberUuid = localCPMemberUUID(hazelcastInstance)
+        localCPMemberUuid?.let { uuid ->
+            removeFromCPMember(hazelcastInstance, uuid)
+        }
+    }
+
+    /** Remove [removeCPMemberUuid] member from cluster CP Member List, using [hazelcastInstance]*/
+    fun removeFromCPMember(hazelcastInstance: HazelcastInstance, removeCPMemberUuid: UUID) {
+        val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance)
+        cpSubsystemManagementService.removeCPMember(removeCPMemberUuid)
+            .toCompletableFuture().get()
+        log.info("Removed CP member($removeCPMemberUuid)")
+    }
+
+    /** Get [hazelcastInstance] CP Group members*/
+    fun cpGroupMembers(hazelcastInstance: HazelcastInstance): List<CPMember> {
+        return cpGroup(hazelcastInstance).members().toList()
+    }
+
+    /** Get [hazelcastInstance] CP Group[groupName] members*/
+    fun cpGroup(
+        hazelcastInstance: HazelcastInstance,
+        groupName: String? = CPGroup.METADATA_CP_GROUP_NAME
+    ): CPGroup {
+        return cpSubsystemManagementService(hazelcastInstance).getCPGroup(groupName)
+            .toCompletableFuture().get()
+    }
+
+    /** Get [hazelcastInstance] CP member UUIDs*/
+    fun clusterCPMemberUUIDs(hazelcastInstance: HazelcastInstance): List<UUID> {
+        return clusterCPMembers(hazelcastInstance).map { it.uuid }
+    }
+
+    /** Get [hazelcastInstance] CP members*/
+    fun clusterCPMembers(hazelcastInstance: HazelcastInstance): List<CPMember> {
+        return cpSubsystemManagementService(hazelcastInstance).cpMembers.toCompletableFuture().get().toList()
+    }
+
+    /** Get CPSubsystemManagementService for [hazelcastInstance] */
+    fun cpSubsystemManagementService(hazelcastInstance: HazelcastInstance): CPSubsystemManagementService {
+        val cpSubsystemManagementService = hazelcastInstance.cpSubsystem.cpSubsystemManagementService
+        cpSubsystemManagementService.awaitUntilDiscoveryCompleted(3, TimeUnit.MINUTES)
+        return cpSubsystemManagementService
+    }
+
+    /** Get local CPMemberUUID for [hazelcastInstance] */
+    fun localCPMemberUUID(hazelcastInstance: HazelcastInstance) = localCPMember(hazelcastInstance)?.uuid
+
+    /** Check local member is CP member for [hazelcastInstance] */
+    fun checkLocalMemberIsCPMember(hazelcastInstance: HazelcastInstance): Boolean {
+        return localCPMember(hazelcastInstance) != null
+    }
+
+    /** Get local CP member for [hazelcastInstance] */
+    fun localCPMember(hazelcastInstance: HazelcastInstance) =
+        cpSubsystemManagementService(hazelcastInstance).localCPMember
+
+    /** Get local CP member UUID for [hazelcastInstance] */
+    fun localMemberUUID(hazelcastInstance: HazelcastInstance) = hazelcastInstance.cluster.localMember.uuid
+}
index cbf488c..e7ac273 100644 (file)
@@ -6,6 +6,9 @@ hazelcast:
   cp-subsystem:
     cp-member-count: 3
     group-size: 3
+    session-time-to-live-seconds: 60
+    session-heartbeat-interval-seconds: 5
+    missing-cp-member-auto-removal-seconds: 120
 #  network:
 #    join:
 #      multicast:
index 356be1d..cb493d1 100644 (file)
@@ -6,6 +6,9 @@ hazelcast:
   cp-subsystem:
     cp-member-count: 3
     group-size: 3
+    session-time-to-live-seconds: 60
+    session-heartbeat-interval-seconds: 5
+    missing-cp-member-auto-removal-seconds: 120
 #  network:
 #    join:
 #      multicast:
index d256f49..e60b0c5 100644 (file)
@@ -6,6 +6,9 @@ hazelcast:
   cp-subsystem:
     cp-member-count: 3
     group-size: 3
+    session-time-to-live-seconds: 60
+    session-heartbeat-interval-seconds: 5
+    missing-cp-member-auto-removal-seconds: 120
 #  network:
 #    join:
 #      multicast:
index 9c7d566..3cb10a0 100644 (file)
@@ -6,6 +6,9 @@ hazelcast:
   cp-subsystem:
     cp-member-count: 3
     group-size: 3
+    session-time-to-live-seconds: 60
+    session-heartbeat-interval-seconds: 5
+    missing-cp-member-auto-removal-seconds: 120
 #  network:
 #    join:
 #      multicast:
index dcecf45..ab5d448 100644 (file)
@@ -6,6 +6,9 @@ hazelcast:
   cp-subsystem:
     cp-member-count: 3
     group-size: 3
+    session-time-to-live-seconds: 60
+    session-heartbeat-interval-seconds: 5
+    missing-cp-member-auto-removal-seconds: 120
 #  network:
 #    join:
 #      multicast: