2  * Copyright © 2018-2019 AT&T Intellectual Property.
 
   4  * Licensed under the Apache License, Version 2.0 (the "License");
 
   5  * you may not use this file except in compliance with the License.
 
   6  * You may obtain a copy of the License at
 
   8  *     http://www.apache.org/licenses/LICENSE-2.0
 
  10  * Unless required by applicable law or agreed to in writing, software
 
  11  * distributed under the License is distributed on an "AS IS" BASIS,
 
  12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  13  * See the License for the specific language governing permissions and
 
  14  * limitations under the License.
 
  17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
 
  19 import com.hazelcast.client.HazelcastClient
 
  20 import com.hazelcast.client.config.ClientConfig
 
  21 import com.hazelcast.client.config.YamlClientConfigBuilder
 
  22 import com.hazelcast.cluster.Member
 
  23 import com.hazelcast.cluster.MembershipEvent
 
  24 import com.hazelcast.cluster.MembershipListener
 
  25 import com.hazelcast.config.Config
 
  26 import com.hazelcast.config.FileSystemYamlConfig
 
  27 import com.hazelcast.config.MemberAttributeConfig
 
  28 import com.hazelcast.core.Hazelcast
 
  29 import com.hazelcast.core.HazelcastInstance
 
  30 import com.hazelcast.cp.CPSubsystemManagementService
 
  31 import com.hazelcast.cp.lock.FencedLock
 
  32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
 
  33 import kotlinx.coroutines.delay
 
  34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 
  35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
 
  36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
 
  37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
 
  38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 
  39 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 
  40 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 
  41 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 
  42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
 
  43 import org.springframework.stereotype.Service
 
  44 import java.time.Duration
 
  45 import java.util.concurrent.TimeUnit
 
  48 open class HazlecastClusterService : BluePrintClusterService {
 
  50     private val log = logger(HazlecastClusterService::class)
 
  51     lateinit var hazelcast: HazelcastInstance
 
  52     lateinit var cpSubsystemManagementService: CPSubsystemManagementService
 
  53     var joinedClient = false
 
  54     var joinedLite = false
 
  56     override suspend fun <T> startCluster(configuration: T) {
 
  57         /** Get the Hazelcast Client or Server instance */
 
  59             when (configuration) {
 
  61                     joinedLite = configuration.isLiteMember
 
  62                     val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
 
  63                     /** Promote as CP Member */
 
  64                     promoteAsCPMember(hazelcastInstance)
 
  69                     HazelcastClient.newHazelcastClient(configuration)
 
  73                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
 
  74                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
 
  76                     val memberAttributeConfig = MemberAttributeConfig()
 
  77                     memberAttributeConfig.setAttribute(
 
  78                         BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
 
  82                     val configFile = configuration.configFile
 
  83                     /** Check file exists */
 
  84                     val clusterConfigFile = normalizedFile(configuration.configFile)
 
  85                     check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
 
  86                         "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
 
  88                     check(clusterConfigFile.exists()) {
 
  89                         "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
 
  91                     log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
 
  93                     /** Hazelcast Client from config file */
 
  94                     if (configuration.joinAsClient) {
 
  95                         /** Set the configuration file to system properties, so that Hazelcast will read automatically */
 
  96                         System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
 
  98                         val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
 
  99                         hazelcastClientConfiguration.properties = configuration.properties
 
 100                         HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
 
 102                         /** Hazelcast Server from config file */
 
 103                         val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
 
 104                         hazelcastServerConfiguration.clusterName = configuration.id
 
 105                         hazelcastServerConfiguration.instanceName = configuration.nodeId
 
 106                         hazelcastServerConfiguration.properties = configuration.properties
 
 107                         hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
 
 108                         joinedLite = hazelcastServerConfiguration.isLiteMember
 
 109                         val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
 
 110                         /** Promote as CP Member */
 
 111                         promoteAsCPMember(hazelcastInstance)
 
 116                     throw BluePrintProcessorException("couldn't understand the cluster configuration")
 
 120         /** Add the Membership Listeners */
 
 121         hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
 
 123             "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
 
 127     override fun isClient(): Boolean {
 
 131     override fun isLiteMember(): Boolean {
 
 135     override fun clusterJoined(): Boolean {
 
 136         return hazelcast.lifecycleService.isRunning
 
 139     override suspend fun masterMember(partitionGroup: String): ClusterMember {
 
 140         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 141         return hazelcast.cluster.members.first().toClusterMember()
 
 144     override suspend fun allMembers(): Set<ClusterMember> {
 
 145         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 146         return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
 
 149     override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
 
 150         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 151         return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
 
 154     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
 
 155         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 156         return hazelcast.getMap<String, T>(name)
 
 160      * The DistributedLock is a distributed implementation of Java’s Lock.
 
 161      * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
 
 162      * determine ordering of multiple concurrent lock holders.
 
 163      * DistributedLocks are designed to account for failures within the cluster.
 
 164      * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
 
 165      * the lock will be released and granted to the next waiting process.
 
 167     override suspend fun clusterLock(name: String): ClusterLock {
 
 168         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 169         return ClusterLockImpl(hazelcast, name)
 
 172     /** Return interface may change and it will be included in BluePrintClusterService */
 
 174     suspend fun clusterScheduler(name: String): IScheduledExecutorService {
 
 175         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 176         return hazelcast.getScheduledExecutorService(name)
 
 179     override suspend fun shutDown(duration: Duration) {
 
 180         if (::hazelcast.isInitialized && clusterJoined()) {
 
 181             delay(duration.toMillis())
 
 182             HazlecastClusterUtils.terminate(hazelcast)
 
 187     suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
 
 188         if (!joinedClient && !joinedLite) {
 
 189             HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance)
 
 193     suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
 
 194         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 195         check(!isClient()) { "not supported for cluster client members." }
 
 196         return hazelcastApplicationMembers(ClusterUtils.applicationName())
 
 199     suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
 
 200         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
 
 201         val applicationMembers: MutableMap<String, Member> = hashMapOf()
 
 202         hazelcast.cluster.members.map { member ->
 
 203             val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
 
 204             if (memberName.startsWith(appName, true)) {
 
 205                 applicationMembers[memberName] = member
 
 208         return applicationMembers
 
 212 open class BlueprintsClusterMembershipListener() :
 
 214     private val log = logger(BlueprintsClusterMembershipListener::class)
 
 216     override fun memberRemoved(membershipEvent: MembershipEvent) {
 
 217         log.info("MembershipEvent: $membershipEvent")
 
 220     override fun memberAdded(membershipEvent: MembershipEvent) {
 
 221         log.info("MembershipEvent: $membershipEvent")
 
 225 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
 
 226     private val log = logger(ClusterLockImpl::class)
 
 228     lateinit var distributedLock: FencedLock
 
 230     override fun name(): String {
 
 231         return distributedLock.name
 
 234     override suspend fun lock() {
 
 235         distributedLock = hazelcast.cpSubsystem.getLock(name)
 
 236         distributedLock.lock()
 
 237         log.trace("Cluster lock($name) created..")
 
 240     override suspend fun tryLock(timeout: Long): Boolean {
 
 241         distributedLock = hazelcast.cpSubsystem.getLock(name)
 
 242         return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
 
 245     override suspend fun unLock() {
 
 246         distributedLock.unlock()
 
 247         log.trace("Cluster unlock(${name()}) successfully..")
 
 250     override fun isLocked(): Boolean {
 
 251         return distributedLock.isLocked
 
 254     override suspend fun fenceLock(): String {
 
 255         distributedLock = hazelcast.cpSubsystem.getLock(name)
 
 256         val fence = distributedLock.lockAndGetFence()
 
 257         log.trace("Cluster lock($name) fence($fence) created..")
 
 258         return fence.toString()
 
 261     override suspend fun tryFenceLock(timeout: Long): String {
 
 262         distributedLock = hazelcast.cpSubsystem.getLock(name)
 
 263         return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
 
 266     override fun close() {