2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import kotlinx.coroutines.delay
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
40 import org.onap.ccsdk.cds.controllerblueprints.core.logger
41 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
43 import org.springframework.stereotype.Service
44 import java.time.Duration
45 import java.util.concurrent.TimeUnit
48 open class HazlecastClusterService : BluePrintClusterService {
50 private val log = logger(HazlecastClusterService::class)
51 lateinit var hazelcast: HazelcastInstance
52 lateinit var cpSubsystemManagementService: CPSubsystemManagementService
53 var joinedClient = false
54 var joinedLite = false
56 override suspend fun <T> startCluster(configuration: T) {
57 /** Get the Hazelcast Cliet or Server instance */
59 when (configuration) {
61 joinedLite = configuration.isLiteMember
62 val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
63 /** Promote as CP Member */
64 promoteAsCPMember(hazelcastInstance)
69 HazelcastClient.newHazelcastClient(configuration)
73 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
74 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
76 val memberAttributeConfig = MemberAttributeConfig()
77 memberAttributeConfig.setAttribute(
78 BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
82 val configFile = configuration.configFile
83 /** Check file exists */
84 val clusterConfigFile = normalizedFile(configuration.configFile)
85 check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
86 "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
88 check(clusterConfigFile.exists()) {
89 "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
91 log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
93 /** Hazelcast Client from config file */
94 if (configuration.joinAsClient) {
95 /** Set the configuration file to system properties, so that Hazelcast will read automatically */
96 System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
98 val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
99 hazelcastClientConfiguration.properties = configuration.properties
100 HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
102 /** Hazelcast Server from config file */
103 val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
104 hazelcastServerConfiguration.properties = configuration.properties
105 hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
106 joinedLite = hazelcastServerConfiguration.isLiteMember
107 val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
108 /** Promote as CP Member */
109 promoteAsCPMember(hazelcastInstance)
114 throw BluePrintProcessorException("couldn't understand the cluster configuration")
118 /** Add the Membership Listeners */
119 hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this))
121 "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
125 override fun isClient(): Boolean {
129 override fun isLiteMember(): Boolean {
133 override fun clusterJoined(): Boolean {
134 return hazelcast.lifecycleService.isRunning
137 override suspend fun masterMember(partitionGroup: String): ClusterMember {
138 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
139 return hazelcast.cluster.members.first().toClusterMember()
142 override suspend fun allMembers(): Set<ClusterMember> {
143 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
144 return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
147 override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
148 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
149 return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
152 override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
153 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
154 return hazelcast.getMap<String, T>(name)
158 * The DistributedLock is a distributed implementation of Java’s Lock.
159 * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
160 * determine ordering of multiple concurrent lock holders.
161 * DistributedLocks are designed to account for failures within the cluster.
162 * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
163 * the lock will be released and granted to the next waiting process.
165 override suspend fun clusterLock(name: String): ClusterLock {
166 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
167 return ClusterLockImpl(hazelcast, name)
170 /** Return interface may change and it will be included in BluePrintClusterService */
172 suspend fun clusterScheduler(name: String): IScheduledExecutorService {
173 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
174 return hazelcast.getScheduledExecutorService(name)
177 override suspend fun shutDown(duration: Duration) {
178 if (::hazelcast.isInitialized && clusterJoined()) {
179 delay(duration.toMillis())
180 HazlecastClusterUtils.terminate(hazelcast)
185 suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
186 if (!joinedClient && !joinedLite) {
187 HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance)
191 suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
192 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
193 check(!isClient()) { "not supported for cluster client members." }
194 return hazelcastApplicationMembers(ClusterUtils.applicationName())
197 suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
198 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
199 val applicationMembers: MutableMap<String, Member> = hashMapOf()
200 hazelcast.cluster.members.map { member ->
201 val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
202 if (memberName.startsWith(appName, true)) {
203 applicationMembers[memberName] = member
206 return applicationMembers
210 open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) :
212 private val log = logger(BlueprintsClusterMembershipListener::class)
214 override fun memberRemoved(membershipEvent: MembershipEvent) {
215 log.info("MembershipEvent: $membershipEvent")
218 override fun memberAdded(membershipEvent: MembershipEvent) {
219 log.info("MembershipEvent: $membershipEvent")
223 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
224 private val log = logger(ClusterLockImpl::class)
226 lateinit var distributedLock: FencedLock
228 override fun name(): String {
229 return distributedLock.name
232 override suspend fun lock() {
233 distributedLock = hazelcast.cpSubsystem.getLock(name)
234 distributedLock.lock()
235 log.trace("Cluster lock($name) created..")
238 override suspend fun tryLock(timeout: Long): Boolean {
239 distributedLock = hazelcast.cpSubsystem.getLock(name)
240 return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
243 override suspend fun unLock() {
244 distributedLock.unlock()
245 log.trace("Cluster unlock(${name()}) successfully..")
248 override fun isLocked(): Boolean {
249 return distributedLock.isLocked
252 override suspend fun fenceLock(): String {
253 distributedLock = hazelcast.cpSubsystem.getLock(name)
254 val fence = distributedLock.lockAndGetFence()
255 log.trace("Cluster lock($name) fence($fence) created..")
256 return fence.toString()
259 override suspend fun tryFenceLock(timeout: Long): String {
260 distributedLock = hazelcast.cpSubsystem.getLock(name)
261 return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
264 override fun close() {