2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import kotlinx.coroutines.delay
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
40 import org.onap.ccsdk.cds.controllerblueprints.core.logger
41 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
43 import org.springframework.stereotype.Service
44 import java.time.Duration
45 import java.util.concurrent.TimeUnit
48 open class HazelcastClusterService : BluePrintClusterService {
50 private val log = logger(HazelcastClusterService::class)
51 lateinit var hazelcast: HazelcastInstance
52 lateinit var cpSubsystemManagementService: CPSubsystemManagementService
53 var joinedClient = false
54 var joinedLite = false
56 override suspend fun <T> startCluster(configuration: T) {
57 /** Get the Hazelcast Client or Server instance */
59 when (configuration) {
61 joinedLite = configuration.isLiteMember
62 val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
63 /** Promote as CP Member */
64 promoteAsCPMember(hazelcastInstance)
69 HazelcastClient.newHazelcastClient(configuration)
73 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
74 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
76 val memberAttributeConfig = MemberAttributeConfig()
77 memberAttributeConfig.setAttribute(
78 BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
82 val configFile = configuration.configFile
83 /** Check file exists */
84 val clusterConfigFile = normalizedFile(configuration.configFile)
85 check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
86 "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
88 check(clusterConfigFile.exists()) {
89 "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
91 log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
93 /** Hazelcast Client from config file */
94 if (configuration.joinAsClient) {
95 /** Set the configuration file to system properties, so that Hazelcast will read automatically */
96 System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
98 val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
99 hazelcastClientConfiguration.properties = configuration.properties
100 HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
102 /** Hazelcast Server from config file */
103 val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
104 hazelcastServerConfiguration.clusterName = configuration.id
105 hazelcastServerConfiguration.instanceName = configuration.nodeId
106 hazelcastServerConfiguration.properties = configuration.properties
107 hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
108 joinedLite = hazelcastServerConfiguration.isLiteMember
109 val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
110 /** Promote as CP Member */
111 promoteAsCPMember(hazelcastInstance)
116 throw BluePrintProcessorException("couldn't understand the cluster configuration")
120 /** Add the Membership Listeners */
121 hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
123 "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
127 override fun isClient(): Boolean {
131 override fun isLiteMember(): Boolean {
135 override fun clusterJoined(): Boolean {
136 return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning
139 override suspend fun masterMember(partitionGroup: String): ClusterMember {
140 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
141 return hazelcast.cluster.members.first().toClusterMember()
144 override suspend fun allMembers(): Set<ClusterMember> {
145 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
146 return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
149 override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
150 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
151 return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
154 override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
155 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
156 return hazelcast.getMap<String, T>(name)
160 * The DistributedLock is a distributed implementation of Java’s Lock.
161 * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
162 * determine ordering of multiple concurrent lock holders.
163 * DistributedLocks are designed to account for failures within the cluster.
164 * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
165 * the lock will be released and granted to the next waiting process.
167 override suspend fun clusterLock(name: String): ClusterLock {
168 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
169 return ClusterLockImpl(hazelcast, name)
172 /** Return interface may change and it will be included in BluePrintClusterService */
174 suspend fun clusterScheduler(name: String): IScheduledExecutorService {
175 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
176 return hazelcast.getScheduledExecutorService(name)
179 override suspend fun shutDown(duration: Duration) {
180 if (::hazelcast.isInitialized && clusterJoined()) {
181 delay(duration.toMillis())
182 HazelcastClusterUtils.terminate(hazelcast)
187 suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
188 if (!joinedClient && !joinedLite) {
189 HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
193 suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
194 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
195 check(!isClient()) { "not supported for cluster client members." }
196 return hazelcastApplicationMembers(ClusterUtils.applicationName())
199 suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
200 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
201 val applicationMembers: MutableMap<String, Member> = hashMapOf()
202 hazelcast.cluster.members.map { member ->
203 val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
204 if (memberName.startsWith(appName, true)) {
205 applicationMembers[memberName] = member
208 return applicationMembers
212 open class BlueprintsClusterMembershipListener() :
214 private val log = logger(BlueprintsClusterMembershipListener::class)
216 override fun memberRemoved(membershipEvent: MembershipEvent) {
217 log.info("MembershipEvent: $membershipEvent")
220 override fun memberAdded(membershipEvent: MembershipEvent) {
221 log.info("MembershipEvent: $membershipEvent")
225 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
226 private val log = logger(ClusterLockImpl::class)
228 private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name)
230 override fun name(): String {
231 return distributedLock.name
234 override suspend fun lock() {
235 distributedLock.lock()
236 log.trace("Cluster lock($name) created..")
239 override suspend fun tryLock(timeout: Long): Boolean {
240 return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
241 .also { if (it) log.trace("Cluster lock acquired: $name")
242 else log.trace("Failed to acquire Cluster lock $name within timeout $timeout") }
245 override suspend fun unLock() {
246 distributedLock.unlock()
247 log.trace("Cluster unlock(${name()}) successfully..")
250 override fun isLocked(): Boolean {
251 return distributedLock.isLocked
254 override fun isLockedByCurrentThread(): Boolean {
255 return distributedLock.isLockedByCurrentThread
258 override suspend fun fenceLock(): String {
259 val fence = distributedLock.lockAndGetFence()
260 log.trace("Cluster lock($name) fence($fence) created..")
261 return fence.toString()
264 override suspend fun tryFenceLock(timeout: Long): String {
265 return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
268 override fun close() {