2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
19 import io.atomix.cluster.ClusterMembershipEvent
20 import io.atomix.core.Atomix
21 import kotlinx.coroutines.delay
22 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
24 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
28 import java.time.Duration
29 import java.util.concurrent.CompletableFuture
32 open class AtomixBluePrintClusterService : BluePrintClusterService {
34 private val log = logger(AtomixBluePrintClusterService::class)
36 lateinit var atomix: Atomix
38 private var joined = false
40 override suspend fun startCluster(clusterInfo: ClusterInfo) {
42 "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
43 "starting with members(${clusterInfo.clusterMembers})"
46 /** Create Atomix cluster either from config file or default multi-cast cluster*/
47 atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
48 AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
50 AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
53 /** Listen for the member chaneg events */
54 atomix.membershipService.addListener { membershipEvent ->
55 when (membershipEvent.type()) {
56 ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
57 ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
58 ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
59 else -> log.info("***** Member event unknown")
64 "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
65 "created successfully...."
68 /** Receive ping from network */
69 val pingHandler = { message: String ->
70 log.info("####### ping message received : $message")
71 CompletableFuture.completedFuture(message)
73 atomix.communicationService.subscribe("ping", pingHandler)
75 /** Ping the network */
76 atomix.communicationService.broadcast(
78 "ping from node(${clusterInfo.nodeId})"
83 override fun clusterJoined(): Boolean {
87 override suspend fun allMembers(): Set<ClusterMember> {
88 check(::atomix.isInitialized) { "failed to start and join cluster" }
89 check(atomix.isRunning) { "cluster is not running" }
90 return atomix.membershipService.members.map {
93 memberAddress = it.host()
98 override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
99 check(::atomix.isInitialized) { "failed to start and join cluster" }
100 check(atomix.isRunning) { "cluster is not running" }
102 return atomix.membershipService.members.filter {
103 it.id().id().startsWith(memberPrefix, true)
104 }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
108 override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
109 return AtomixLibUtils.distributedMapStore<T>(atomix, name)
112 override suspend fun shutDown(duration: Duration) {
113 val shutDownMilli = duration.toMillis()
114 log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")