2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import kotlinx.coroutines.delay
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
40 import org.onap.ccsdk.cds.controllerblueprints.core.logger
41 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
43 import org.springframework.stereotype.Service
44 import java.time.Duration
45 import java.util.concurrent.TimeUnit
48 open class HazelcastClusterService : BluePrintClusterService {
50 private val log = logger(HazelcastClusterService::class)
51 lateinit var hazelcast: HazelcastInstance
52 lateinit var cpSubsystemManagementService: CPSubsystemManagementService
53 var joinedClient = false
54 var joinedLite = false
56 override suspend fun <T> startCluster(configuration: T) {
57 /** Get the Hazelcast Client or Server instance */
59 when (configuration) {
61 joinedLite = configuration.isLiteMember
62 val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
63 /** Promote as CP Member */
64 promoteAsCPMember(hazelcastInstance)
69 HazelcastClient.newHazelcastClient(configuration)
73 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
74 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
76 val memberAttributeConfig = MemberAttributeConfig()
77 memberAttributeConfig.setAttribute(
78 BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
82 val configFile = configuration.configFile
84 /** Check file exists */
85 val clusterConfigFile = normalizedFile(configuration.configFile)
86 check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
87 "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
89 check(clusterConfigFile.exists()) {
90 "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
92 log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
94 /** Hazelcast Client from config file */
95 if (configuration.joinAsClient) {
96 /** Set the configuration file to system properties, so that Hazelcast will read automatically */
97 System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
99 val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
100 hazelcastClientConfiguration.properties = configuration.properties
101 HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
103 /** Hazelcast Server from config file */
104 val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
105 hazelcastServerConfiguration.clusterName = configuration.id
106 hazelcastServerConfiguration.instanceName = configuration.nodeId
107 hazelcastServerConfiguration.properties = configuration.properties
108 hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
109 joinedLite = hazelcastServerConfiguration.isLiteMember
110 val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
111 /** Promote as CP Member */
112 promoteAsCPMember(hazelcastInstance)
117 throw BluePrintProcessorException("couldn't understand the cluster configuration")
121 /** Add the Membership Listeners */
122 hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
124 "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
128 override fun isClient(): Boolean {
132 override fun isLiteMember(): Boolean {
136 override fun clusterJoined(): Boolean {
137 return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning
140 override suspend fun masterMember(partitionGroup: String): ClusterMember {
141 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
142 return hazelcast.cluster.members.first().toClusterMember()
145 override suspend fun allMembers(): Set<ClusterMember> {
146 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
147 return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
150 override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
151 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
152 return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
155 override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
156 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
157 return hazelcast.getMap<String, T>(name)
161 * The DistributedLock is a distributed implementation of Java’s Lock.
162 * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
163 * determine ordering of multiple concurrent lock holders.
164 * DistributedLocks are designed to account for failures within the cluster.
165 * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
166 * the lock will be released and granted to the next waiting process.
168 override suspend fun clusterLock(name: String): ClusterLock {
169 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
170 return ClusterLockImpl(hazelcast, name)
173 /** Return interface may change and it will be included in BluePrintClusterService */
175 suspend fun clusterScheduler(name: String): IScheduledExecutorService {
176 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
177 return hazelcast.getScheduledExecutorService(name)
180 override suspend fun shutDown(duration: Duration) {
181 if (::hazelcast.isInitialized && clusterJoined()) {
182 delay(duration.toMillis())
183 HazelcastClusterUtils.terminate(hazelcast)
188 suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
189 if (!joinedClient && !joinedLite) {
190 HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
194 suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
195 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
196 check(!isClient()) { "not supported for cluster client members." }
197 return hazelcastApplicationMembers(ClusterUtils.applicationName())
200 suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
201 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
202 val applicationMembers: MutableMap<String, Member> = hashMapOf()
203 hazelcast.cluster.members.map { member ->
204 val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
205 if (memberName.startsWith(appName, true)) {
206 applicationMembers[memberName] = member
209 return applicationMembers
213 open class BlueprintsClusterMembershipListener() :
216 private val log = logger(BlueprintsClusterMembershipListener::class)
218 override fun memberRemoved(membershipEvent: MembershipEvent) {
219 log.info("MembershipEvent: $membershipEvent")
222 override fun memberAdded(membershipEvent: MembershipEvent) {
223 log.info("MembershipEvent: $membershipEvent")
227 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
229 private val log = logger(ClusterLockImpl::class)
231 private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name)
233 override fun name(): String {
234 return distributedLock.name
237 override suspend fun lock() {
238 distributedLock.lock()
239 log.trace("Cluster lock($name) created..")
242 override suspend fun tryLock(timeout: Long): Boolean {
243 return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
245 if (it) log.trace("Cluster lock acquired: $name")
246 else log.trace("Failed to acquire Cluster lock $name within timeout $timeout")
250 override suspend fun unLock() {
251 distributedLock.unlock()
252 log.trace("Cluster unlock(${name()}) successfully..")
255 override fun isLocked(): Boolean {
256 return distributedLock.isLocked
259 override fun isLockedByCurrentThread(): Boolean {
260 return distributedLock.isLockedByCurrentThread
263 override suspend fun fenceLock(): String {
264 val fence = distributedLock.lockAndGetFence()
265 log.trace("Cluster lock($name) fence($fence) created..")
266 return fence.toString()
269 override suspend fun tryFenceLock(timeout: Long): String {
270 return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
273 override fun close() {