Merge "Reorder Create Tabs"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / atomix-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / atomix / service / AtomixBluePrintClusterService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
18
19 import io.atomix.cluster.ClusterMembershipEvent
20 import io.atomix.core.Atomix
21 import io.atomix.core.lock.DistributedLock
22 import kotlinx.coroutines.delay
23 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
24 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import org.springframework.stereotype.Service
30 import java.time.Duration
31 import java.util.concurrent.CompletableFuture
32
33 @Service
34 open class AtomixBluePrintClusterService : BluePrintClusterService {
35
36     private val log = logger(AtomixBluePrintClusterService::class)
37
38     lateinit var atomix: Atomix
39
40     override suspend fun startCluster(clusterInfo: ClusterInfo) {
41         log.info(
42             "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
43                 "starting with members(${clusterInfo.clusterMembers})"
44         )
45
46         /** Create Atomix cluster either from config file or default multi-cast cluster*/
47         atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
48             AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
49         } else {
50             AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
51         }
52
53         /** Listen for the member chaneg events */
54         atomix.membershipService.addListener { membershipEvent ->
55             when (membershipEvent.type()) {
56                 ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
57                 ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
58                 ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}")
59                 ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
60                 else -> log.info("Member event unknown")
61             }
62         }
63         /** Start and Join the Cluster */
64         atomix.start().join()
65         log.info(
66             "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
67                 "created successfully...."
68         )
69
70         /** Receive ping from network */
71         val pingHandler = { message: String ->
72             log.info("####### ping message received : $message")
73             CompletableFuture.completedFuture(message)
74         }
75         atomix.communicationService.subscribe("ping", pingHandler)
76
77         /** Ping the network */
78         atomix.communicationService.broadcast(
79             "ping",
80             "ping from node(${clusterInfo.nodeId})"
81         )
82     }
83
84     override fun clusterJoined(): Boolean {
85         return atomix.isRunning
86     }
87
88     override suspend fun masterMember(partitionGroup: String): ClusterMember {
89         check(::atomix.isInitialized) { "failed to start and join cluster" }
90         check(atomix.isRunning) { "cluster is not running" }
91         val masterId = atomix.partitionService
92             .getPartitionGroup(partitionGroup)
93             .getPartition("1").primary()
94         val masterMember = atomix.membershipService.getMember(masterId)
95         return ClusterMember(
96             id = masterMember.id().id(),
97             memberAddress = masterMember.address().toString()
98         )
99     }
100
101     override suspend fun allMembers(): Set<ClusterMember> {
102         check(::atomix.isInitialized) { "failed to start and join cluster" }
103         check(atomix.isRunning) { "cluster is not running" }
104
105         return atomix.membershipService.members.map {
106             ClusterMember(
107                 id = it.id().id(),
108                 memberAddress = it.address().toString()
109             )
110         }.toSet()
111     }
112
113     override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
114         check(::atomix.isInitialized) { "failed to start and join cluster" }
115         check(atomix.isRunning) { "cluster is not running" }
116
117         return atomix.membershipService.members.filter {
118             it.id().id().startsWith(memberPrefix, true)
119         }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
120             .toSet()
121     }
122
123     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
124         check(::atomix.isInitialized) { "failed to start and join cluster" }
125         return AtomixLibUtils.distributedMapStore<T>(atomix, name)
126     }
127
128     /** The DistributedLock is a distributed implementation of Java’s Lock.
129      * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
130      * determine ordering of multiple concurrent lock holders.
131      * DistributedLocks are designed to account for failures within the cluster.
132      * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
133      * the lock will be released and granted to the next waiting process.     *
134      */
135     override suspend fun clusterLock(name: String): ClusterLock {
136         check(::atomix.isInitialized) { "failed to start and join cluster" }
137         return ClusterLockImpl(atomix, name)
138     }
139
140     override suspend fun shutDown(duration: Duration) {
141         if (::atomix.isInitialized) {
142             val shutDownMilli = duration.toMillis()
143             log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
144             delay(shutDownMilli)
145             atomix.stop()
146         }
147     }
148 }
149
150 open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
151     val log = logger(ClusterLockImpl::class)
152
153     lateinit var distributedLock: DistributedLock
154
155     override fun name(): String {
156         return distributedLock.name()
157     }
158
159     override suspend fun lock() {
160         distributedLock = AtomixLibUtils.distributedLock(atomix, name)
161         distributedLock.lock()
162         log.debug("Cluster lock($name) created..")
163     }
164
165     override suspend fun tryLock(timeout: Long): Boolean {
166         distributedLock = AtomixLibUtils.distributedLock(atomix, name)
167         return distributedLock.tryLock(Duration.ofMillis(timeout))
168     }
169
170     override suspend fun unLock() {
171         distributedLock.unlock()
172         log.debug("Cluster unlock(${name()}) successfully..")
173     }
174
175     override fun isLocked(): Boolean {
176         return distributedLock.isLocked
177     }
178
179     override fun close() {
180         if (::distributedLock.isInitialized) {
181             distributedLock.close()
182         }
183     }
184 }