2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import com.hazelcast.topic.Message
34 import com.hazelcast.topic.MessageListener
35 import kotlinx.coroutines.delay
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessage
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterService
38 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
39 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
40 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
41 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
42 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
43 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
44 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
45 import org.onap.ccsdk.cds.controllerblueprints.core.logger
46 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
47 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
48 import org.springframework.context.ApplicationEventPublisher
49 import org.springframework.stereotype.Service
50 import java.time.Duration
53 import java.util.concurrent.TimeUnit
56 open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BlueprintClusterService {
58 private val log = logger(HazelcastClusterService::class)
59 lateinit var hazelcast: HazelcastInstance
60 lateinit var cpSubsystemManagementService: CPSubsystemManagementService
61 var joinedClient = false
62 var joinedLite = false
64 override suspend fun <T> startCluster(configuration: T) {
65 /** Get the Hazelcast Client or Server instance */
67 when (configuration) {
69 joinedLite = configuration.isLiteMember
70 val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
71 /** Promote as CP Member */
72 promoteAsCPMember(hazelcastInstance)
77 HazelcastClient.newHazelcastClient(configuration)
81 System.setProperty(BlueprintConstants.PROPERTY_CLUSTER_ID, configuration.id)
82 System.setProperty(BlueprintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
84 val memberAttributeConfig = MemberAttributeConfig()
85 memberAttributeConfig.setAttribute(
86 BlueprintConstants.PROPERTY_CLUSTER_NODE_ID,
90 val configFile = configuration.configFile
92 /** Check file exists */
93 val clusterConfigFile = normalizedFile(configuration.configFile)
94 check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
95 "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
97 check(clusterConfigFile.exists()) {
98 "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
100 log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
102 /** Hazelcast Client from config file */
103 if (configuration.joinAsClient) {
104 /** Set the configuration file to system properties, so that Hazelcast will read automatically */
105 System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
107 val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
108 hazelcastClientConfiguration.properties = configuration.properties
109 HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
111 /** Hazelcast Server from config file */
112 val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
113 hazelcastServerConfiguration.clusterName = configuration.id
114 hazelcastServerConfiguration.instanceName = configuration.nodeId
115 hazelcastServerConfiguration.properties = configuration.properties
116 hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
117 joinedLite = hazelcastServerConfiguration.isLiteMember
118 val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
119 /** Promote as CP Member */
120 promoteAsCPMember(hazelcastInstance)
125 throw BlueprintProcessorException("couldn't understand the cluster configuration")
129 /** Add the Membership Listeners */
130 hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
132 "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
134 applicationEventPublisher.publishEvent(ClusterJoinedEvent(this))
137 override fun isClient(): Boolean {
141 override fun isLiteMember(): Boolean {
145 override fun clusterJoined(): Boolean {
146 return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning
149 override suspend fun masterMember(partitionGroup: String): ClusterMember {
150 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
151 return hazelcast.cluster.members.first().toClusterMember()
154 override suspend fun allMembers(): Set<ClusterMember> {
155 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
156 return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
159 override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
160 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
161 return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
164 override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
165 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
166 return hazelcast.getMap<String, T>(name)
170 * The DistributedLock is a distributed implementation of Java’s Lock.
171 * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
172 * determine ordering of multiple concurrent lock holders.
173 * DistributedLocks are designed to account for failures within the cluster.
174 * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
175 * the lock will be released and granted to the next waiting process.
177 override suspend fun clusterLock(name: String): ClusterLock {
178 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
179 return ClusterLockImpl(hazelcast, name)
182 /** Return interface may change and it will be included in BlueprintClusterService */
184 suspend fun clusterScheduler(name: String): IScheduledExecutorService {
185 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
186 return hazelcast.getScheduledExecutorService(name)
189 override suspend fun shutDown(duration: Duration) {
190 if (::hazelcast.isInitialized && clusterJoined()) {
191 delay(duration.toMillis())
192 HazelcastClusterUtils.terminate(hazelcast)
196 override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) {
197 hazelcast.getReliableTopic<T>(topic.name).publish(message)
200 override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID {
201 log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...")
202 return hazelcast.getReliableTopic<T>(topic.name)
203 .addMessageListener(HazelcastMessageListenerAdapter(listener))
206 override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean {
207 log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...")
208 return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid)
212 suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
213 if (!joinedClient && !joinedLite) {
214 HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
218 suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
219 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
220 check(!isClient()) { "not supported for cluster client members." }
221 return hazelcastApplicationMembers(ClusterUtils.applicationName())
224 suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
225 check(::hazelcast.isInitialized) { "failed to start and join cluster" }
226 val applicationMembers: MutableMap<String, Member> = hashMapOf()
227 hazelcast.cluster.members.map { member ->
228 val memberName: String = member.getAttribute(BlueprintConstants.PROPERTY_CLUSTER_NODE_ID)
229 if (memberName.startsWith(appName, true)) {
230 applicationMembers[memberName] = member
233 return applicationMembers
237 open class BlueprintsClusterMembershipListener() :
240 private val log = logger(BlueprintsClusterMembershipListener::class)
242 override fun memberRemoved(membershipEvent: MembershipEvent) {
243 log.info("MembershipEvent: $membershipEvent")
246 override fun memberAdded(membershipEvent: MembershipEvent) {
247 log.info("MembershipEvent: $membershipEvent")
251 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
253 private val log = logger(ClusterLockImpl::class)
255 private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name)
257 override fun name(): String {
258 return distributedLock.name
261 override suspend fun lock() {
262 distributedLock.lock()
263 log.trace("Cluster lock($name) created..")
266 override suspend fun tryLock(timeout: Long): Boolean {
267 return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
269 if (it) log.trace("Cluster lock acquired: $name")
270 else log.trace("Failed to acquire Cluster lock $name within timeout $timeout")
274 override suspend fun unLock() {
275 distributedLock.unlock()
276 log.trace("Cluster unlock(${name()}) successfully..")
279 override fun isLocked(): Boolean {
280 return distributedLock.isLocked
283 override fun isLockedByCurrentThread(): Boolean {
284 return distributedLock.isLockedByCurrentThread
287 override suspend fun fenceLock(): String {
288 val fence = distributedLock.lockAndGetFence()
289 log.trace("Cluster lock($name) fence($fence) created..")
290 return fence.toString()
293 override suspend fun tryFenceLock(timeout: Long): String {
294 return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
297 override fun close() {
301 class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> {
302 override fun onMessage(message: Message<E>?) = message?.let {
303 BlueprintClusterMessage<E>(
304 BlueprintClusterTopic.valueOf(it.source as String),
307 it.publishingMember.toClusterMember()
309 }.let { listener.onMessage(it) }