2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.lock.FencedLock
31 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
32 import kotlinx.coroutines.delay
33 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
37 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
39 import org.onap.ccsdk.cds.controllerblueprints.core.logger
40 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
41 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
42 import org.springframework.stereotype.Service
43 import java.time.Duration
44 import java.util.concurrent.TimeUnit
47 open class HazlecastClusterService : BluePrintClusterService {
49 private val log = logger(HazlecastClusterService::class)
50 lateinit var hazelcast: HazelcastInstance
51 var joinedClient = false
52 var joinedLite = false
54 override suspend fun <T> startCluster(configuration: T) {
55 /** Get the Hazelcast Cliet or Server instance */
57 when (configuration) {
59 joinedLite = configuration.isLiteMember
60 Hazelcast.newHazelcastInstance(configuration)
64 HazelcastClient.newHazelcastClient(configuration)
68 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
69 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
71 val memberAttributeConfig = MemberAttributeConfig()
72 memberAttributeConfig.setAttribute(
73 BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
77 val configFile = configuration.configFile
78 /** Check file exists */
79 val clusterConfigFile = normalizedFile(configuration.configFile)
80 check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
81 "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
83 check(clusterConfigFile.exists()) {
84 "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
86 log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
88 /** Hazelcast Client from config file */
89 if (configuration.joinAsClient) {
90 /** Set the configuration file to system properties, so that Hazelcast will read automatically */
91 System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
93 val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
94 hazelcastClientConfiguration.properties = configuration.properties
95 HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
97 /** Hazelcast Server from config file */
98 val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
99 hazelcastServerConfiguration.properties = configuration.properties
100 hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
101 joinedLite = hazelcastServerConfiguration.isLiteMember
102 Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
106 throw BluePrintProcessorException("couldn't understand the cluster configuration")
110 /** Add the Membership Listeners */
111 hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this))
113 "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
117 override fun isClient(): Boolean {
121 override fun isLiteMember(): Boolean {
125 override fun clusterJoined(): Boolean {
126 return hazelcast.lifecycleService.isRunning
129 override suspend fun masterMember(partitionGroup: String): ClusterMember {
130 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
131 return hazelcast.cluster.members.first().toClusterMember()
134 override suspend fun allMembers(): Set<ClusterMember> {
135 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
136 return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
139 override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
140 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
141 return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
144 override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
145 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
146 return hazelcast.getMap<String, T>(name)
150 * The DistributedLock is a distributed implementation of Java’s Lock.
151 * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
152 * determine ordering of multiple concurrent lock holders.
153 * DistributedLocks are designed to account for failures within the cluster.
154 * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
155 * the lock will be released and granted to the next waiting process.
157 override suspend fun clusterLock(name: String): ClusterLock {
158 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
159 return ClusterLockImpl(hazelcast, name)
162 /** Return interface may change and it will be included in BluePrintClusterService */
164 suspend fun clusterScheduler(name: String): IScheduledExecutorService {
165 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
166 return hazelcast.getScheduledExecutorService(name)
169 override suspend fun shutDown(duration: Duration) {
170 if (::hazelcast.isInitialized && clusterJoined()) {
171 delay(duration.toMillis())
172 hazelcast.lifecycleService.terminate()
177 suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
178 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
179 check(!isClient()) { "not supported for cluster client members." }
180 return hazelcastApplicationMembers(ClusterUtils.applicationName())
183 suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
184 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
185 val applicationMembers: MutableMap<String, Member> = hashMapOf()
186 hazelcast.cluster.members.map { member ->
187 val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
188 if (memberName.startsWith(appName, true)) {
189 applicationMembers[memberName] = member
192 return applicationMembers
196 open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) :
198 private val log = logger(BlueprintsClusterMembershipListener::class)
200 override fun memberRemoved(membershipEvent: MembershipEvent) {
201 log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent")
204 override fun memberAdded(membershipEvent: MembershipEvent) {
205 log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent")
209 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
210 private val log = logger(ClusterLockImpl::class)
212 lateinit var distributedLock: FencedLock
214 override fun name(): String {
215 return distributedLock.name
218 override suspend fun lock() {
219 distributedLock = hazelcast.cpSubsystem.getLock(name)
220 distributedLock.lock()
221 log.trace("Cluster lock($name) created..")
224 override suspend fun tryLock(timeout: Long): Boolean {
225 distributedLock = hazelcast.cpSubsystem.getLock(name)
226 return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
229 override suspend fun unLock() {
230 distributedLock.unlock()
231 log.trace("Cluster unlock(${name()}) successfully..")
234 override fun isLocked(): Boolean {
235 return distributedLock.isLocked
238 override suspend fun fenceLock(): String {
239 distributedLock = hazelcast.cpSubsystem.getLock(name)
240 val fence = distributedLock.lockAndGetFence()
241 log.trace("Cluster lock($name) fence($fence) created..")
242 return fence.toString()
245 override suspend fun tryFenceLock(timeout: Long): String {
246 distributedLock = hazelcast.cpSubsystem.getLock(name)
247 return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
250 override fun close() {