2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
19 import io.atomix.cluster.ClusterMembershipEvent
20 import io.atomix.core.Atomix
21 import io.atomix.core.lock.DistributedLock
22 import kotlinx.coroutines.delay
23 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
24 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import org.springframework.stereotype.Service
30 import java.time.Duration
31 import java.util.concurrent.CompletableFuture
34 open class AtomixBluePrintClusterService : BluePrintClusterService {
36 private val log = logger(AtomixBluePrintClusterService::class)
38 lateinit var atomix: Atomix
40 override suspend fun startCluster(clusterInfo: ClusterInfo) {
42 "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
43 "starting with members(${clusterInfo.clusterMembers})"
46 /** Create Atomix cluster either from config file or default multi-cast cluster*/
47 atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
48 AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
50 AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
53 /** Listen for the member chaneg events */
54 atomix.membershipService.addListener { membershipEvent ->
55 when (membershipEvent.type()) {
56 ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
57 ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
58 ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}")
59 ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
60 else -> log.info("Member event unknown")
63 /** Start and Join the Cluster */
66 "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
67 "created successfully...."
70 /** Receive ping from network */
71 val pingHandler = { message: String ->
72 log.info("####### ping message received : $message")
73 CompletableFuture.completedFuture(message)
75 atomix.communicationService.subscribe("ping", pingHandler)
77 /** Ping the network */
78 atomix.communicationService.broadcast(
80 "ping from node(${clusterInfo.nodeId})"
84 override fun clusterJoined(): Boolean {
85 return atomix.isRunning
88 override suspend fun masterMember(partitionGroup: String): ClusterMember {
89 check(::atomix.isInitialized) { "failed to start and join cluster" }
90 check(atomix.isRunning) { "cluster is not running" }
91 val masterId = atomix.partitionService
92 .getPartitionGroup(partitionGroup)
93 .getPartition("1").primary()
94 val masterMember = atomix.membershipService.getMember(masterId)
96 id = masterMember.id().id(),
97 memberAddress = masterMember.address().toString()
101 override suspend fun allMembers(): Set<ClusterMember> {
102 check(::atomix.isInitialized) { "failed to start and join cluster" }
103 check(atomix.isRunning) { "cluster is not running" }
105 return atomix.membershipService.members.map {
108 memberAddress = it.address().toString()
113 override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
114 check(::atomix.isInitialized) { "failed to start and join cluster" }
115 check(atomix.isRunning) { "cluster is not running" }
117 return atomix.membershipService.members.filter {
118 it.id().id().startsWith(memberPrefix, true)
119 }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
123 override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
124 check(::atomix.isInitialized) { "failed to start and join cluster" }
125 return AtomixLibUtils.distributedMapStore<T>(atomix, name)
128 /** The DistributedLock is a distributed implementation of Java’s Lock.
129 * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
130 * determine ordering of multiple concurrent lock holders.
131 * DistributedLocks are designed to account for failures within the cluster.
132 * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
133 * the lock will be released and granted to the next waiting process. *
135 override suspend fun clusterLock(name: String): ClusterLock {
136 check(::atomix.isInitialized) { "failed to start and join cluster" }
137 return ClusterLockImpl(atomix, name)
140 override suspend fun shutDown(duration: Duration) {
141 if (::atomix.isInitialized) {
142 val shutDownMilli = duration.toMillis()
143 log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
150 open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
151 val log = logger(ClusterLockImpl::class)
153 lateinit var distributedLock: DistributedLock
155 override fun name(): String {
156 return distributedLock.name()
159 override suspend fun lock() {
160 distributedLock = AtomixLibUtils.distributedLock(atomix, name)
161 distributedLock.lock()
162 log.debug("Cluster lock($name) created..")
165 override suspend fun tryLock(timeout: Long): Boolean {
166 distributedLock = AtomixLibUtils.distributedLock(atomix, name)
167 return distributedLock.tryLock(Duration.ofMillis(timeout))
170 override suspend fun unLock() {
171 distributedLock.unlock()
172 log.debug("Cluster unlock(${name()}) successfully..")
175 override fun isLocked(): Boolean {
176 return distributedLock.isLocked
179 override fun close() {
180 if (::distributedLock.isInitialized) {
181 distributedLock.close()