2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.fasterxml.jackson.databind.node.ArrayNode
21 import com.fasterxml.jackson.databind.node.MissingNode
22 import com.fasterxml.jackson.databind.node.NullNode
23 import com.fasterxml.jackson.databind.node.ObjectNode
24 import io.atomix.core.Atomix
25 import io.atomix.core.lock.AtomicLock
26 import io.atomix.core.lock.DistributedLock
27 import io.atomix.core.map.DistributedMap
28 import io.atomix.protocols.backup.MultiPrimaryProtocol
29 import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
30 import io.atomix.protocols.raft.partition.RaftPartitionGroup
31 import io.atomix.utils.net.Address
32 import org.jsoup.nodes.TextNode
33 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
34 import org.onap.ccsdk.cds.controllerblueprints.core.logger
35 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
37 object AtomixLibUtils {
38 private val log = logger(AtomixLibUtils::class)
40 fun configAtomix(filePath: String): Atomix {
41 val configFile = normalizedFile(filePath)
42 return Atomix.builder(configFile.absolutePath).build()
45 fun defaultMulticastAtomix(
46 clusterInfo: ClusterInfo,
47 raftPartitions: Int = 1,
48 primaryBackupPartitions: Int = 32
51 val nodeId = clusterInfo.nodeId
53 val raftPartitionGroup = RaftPartitionGroup.builder("system")
54 .withNumPartitions(raftPartitions)
55 .withMembers(clusterInfo.clusterMembers)
56 .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
59 val primaryBackupGroup =
60 PrimaryBackupPartitionGroup.builder("data")
61 .withNumPartitions(primaryBackupPartitions)
64 return Atomix.builder()
66 .withAddress(Address.from(clusterInfo.nodeAddress))
67 .withManagementGroup(raftPartitionGroup)
68 .withPartitionGroups(primaryBackupGroup)
69 .withMulticastEnabled()
73 fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
74 check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
76 val protocol = MultiPrimaryProtocol.builder()
77 .withBackups(numBackups)
80 return atomix.mapBuilder<String, T>(storeName)
81 .withProtocol(protocol)
83 .withValueType(JsonNode::class.java)
85 JsonNode::class.java, TextNode::class.java, ObjectNode::class.java,
86 ArrayNode::class.java, NullNode::class.java, MissingNode::class.java
91 fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
92 check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
94 val protocol = MultiPrimaryProtocol.builder()
95 .withBackups(numBackups)
97 return atomix.lockBuilder(lockName)
98 .withProtocol(protocol)
102 /** get Atomic distributed lock, to get lock fence information */
103 fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
104 check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
106 val protocol = MultiPrimaryProtocol.builder()
107 .withBackups(numBackups)
110 return atomix.atomicLockBuilder(lockName)
111 .withProtocol(protocol)