a2a0d390295147c163534b11fcb67dd394556920
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / atomix-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / atomix / utils / AtomixLibUtils.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.fasterxml.jackson.databind.node.ArrayNode
21 import com.fasterxml.jackson.databind.node.MissingNode
22 import com.fasterxml.jackson.databind.node.NullNode
23 import com.fasterxml.jackson.databind.node.ObjectNode
24 import io.atomix.core.Atomix
25 import io.atomix.core.lock.DistributedLock
26 import io.atomix.core.map.DistributedMap
27 import io.atomix.protocols.backup.MultiPrimaryProtocol
28 import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
29 import io.atomix.protocols.raft.partition.RaftPartitionGroup
30 import io.atomix.utils.net.Address
31 import org.jsoup.nodes.TextNode
32 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
35
36 object AtomixLibUtils {
37     private val log = logger(AtomixLibUtils::class)
38
39     fun configAtomix(filePath: String): Atomix {
40         val configFile = normalizedFile(filePath)
41         return Atomix.builder(configFile.absolutePath).build()
42     }
43
44     fun defaultMulticastAtomix(
45         clusterInfo: ClusterInfo,
46         raftPartitions: Int = 1,
47         primaryBackupPartitions: Int = 32
48     ): Atomix {
49
50         val nodeId = clusterInfo.nodeId
51
52         val raftPartitionGroup = RaftPartitionGroup.builder("system")
53             .withNumPartitions(raftPartitions)
54             .withMembers(clusterInfo.clusterMembers)
55             .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
56             .build()
57
58         val primaryBackupGroup =
59             PrimaryBackupPartitionGroup.builder("data")
60                 .withNumPartitions(primaryBackupPartitions)
61                 .build()
62
63         return Atomix.builder()
64             .withMemberId(nodeId)
65             .withAddress(Address.from(clusterInfo.nodeAddress))
66             .withManagementGroup(raftPartitionGroup)
67             .withPartitionGroups(primaryBackupGroup)
68             .withMulticastEnabled()
69             .build()
70     }
71
72     fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
73         check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
74
75         val protocol = MultiPrimaryProtocol.builder()
76             .withBackups(numBackups)
77             .build()
78
79         return atomix.mapBuilder<String, T>(storeName)
80             .withProtocol(protocol)
81             .withCacheEnabled()
82             .withValueType(JsonNode::class.java)
83             .withExtraTypes(
84                 JsonNode::class.java, TextNode::class.java, ObjectNode::class.java,
85                 ArrayNode::class.java, NullNode::class.java, MissingNode::class.java
86             )
87             .build()
88     }
89
90     fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
91         check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
92
93         val protocol = MultiPrimaryProtocol.builder()
94             .withBackups(numBackups)
95             .build()
96
97         val lock = atomix.lockBuilder(lockName)
98             .withProtocol(protocol)
99             .build()
100         return lock
101     }
102 }