Cluster distributed data store
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / atomix-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / atomix / service / AtomixBluePrintClusterService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
18
19 import io.atomix.cluster.ClusterMembershipEvent
20 import io.atomix.core.Atomix
21 import kotlinx.coroutines.delay
22 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
24 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
28 import java.time.Duration
29 import java.util.concurrent.CompletableFuture
30
31 @Service
32 open class AtomixBluePrintClusterService : BluePrintClusterService {
33
34     private val log = logger(AtomixBluePrintClusterService::class)
35
36     lateinit var atomix: Atomix
37
38     private var joined = false
39
40     override suspend fun startCluster(clusterInfo: ClusterInfo) {
41         log.info(
42             "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
43                 "starting with members(${clusterInfo.clusterMembers})"
44         )
45
46         /** Create Atomix cluster either from config file or default multi-cast cluster*/
47         atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
48             AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
49         } else {
50             AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
51         }
52
53         /** Listen for the member chaneg events */
54         atomix.membershipService.addListener { membershipEvent ->
55             when (membershipEvent.type()) {
56                 ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
57                 ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
58                 ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
59                 else -> log.info("***** Member event unknown")
60             }
61         }
62         atomix.start().join()
63         log.info(
64             "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
65                 "created successfully...."
66         )
67
68         /** Receive ping from network */
69         val pingHandler = { message: String ->
70             log.info("####### ping message received : $message")
71             CompletableFuture.completedFuture(message)
72         }
73         atomix.communicationService.subscribe("ping", pingHandler)
74
75         /** Ping the network */
76         atomix.communicationService.broadcast(
77             "ping",
78             "ping from node(${clusterInfo.nodeId})"
79         )
80         joined = true
81     }
82
83     override fun clusterJoined(): Boolean {
84         return joined
85     }
86
87     override suspend fun allMembers(): Set<ClusterMember> {
88         check(::atomix.isInitialized) { "failed to start and join cluster" }
89         check(atomix.isRunning) { "cluster is not running" }
90         return atomix.membershipService.members.map {
91             ClusterMember(
92                 id = it.id().id(),
93                 memberAddress = it.host()
94             )
95         }.toSet()
96     }
97
98     override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
99         check(::atomix.isInitialized) { "failed to start and join cluster" }
100         check(atomix.isRunning) { "cluster is not running" }
101
102         return atomix.membershipService.members.filter {
103             it.id().id().startsWith(memberPrefix, true)
104         }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
105             .toSet()
106     }
107
108     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
109         return AtomixLibUtils.distributedMapStore<T>(atomix, name)
110     }
111
112     override suspend fun shutDown(duration: Duration) {
113         val shutDownMilli = duration.toMillis()
114         log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
115         delay(shutDownMilli)
116         atomix.stop()
117     }
118 }