2  * Copyright © 2018-2019 AT&T Intellectual Property.
 
   4  * Licensed under the Apache License, Version 2.0 (the "License");
 
   5  * you may not use this file except in compliance with the License.
 
   6  * You may obtain a copy of the License at
 
   8  *     http://www.apache.org/licenses/LICENSE-2.0
 
  10  * Unless required by applicable law or agreed to in writing, software
 
  11  * distributed under the License is distributed on an "AS IS" BASIS,
 
  12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  13  * See the License for the specific language governing permissions and
 
  14  * limitations under the License.
 
  17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils
 
  19 import com.fasterxml.jackson.databind.JsonNode
 
  20 import com.fasterxml.jackson.databind.node.ArrayNode
 
  21 import com.fasterxml.jackson.databind.node.MissingNode
 
  22 import com.fasterxml.jackson.databind.node.NullNode
 
  23 import com.fasterxml.jackson.databind.node.ObjectNode
 
  24 import io.atomix.core.Atomix
 
  25 import io.atomix.core.lock.DistributedLock
 
  26 import io.atomix.core.map.DistributedMap
 
  27 import io.atomix.protocols.backup.MultiPrimaryProtocol
 
  28 import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
 
  29 import io.atomix.protocols.raft.partition.RaftPartitionGroup
 
  30 import io.atomix.utils.net.Address
 
  31 import org.jsoup.nodes.TextNode
 
  32 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
 
  33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 
  34 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 
  36 object AtomixLibUtils {
 
  37     private val log = logger(AtomixLibUtils::class)
 
  39     fun configAtomix(filePath: String): Atomix {
 
  40         val configFile = normalizedFile(filePath)
 
  41         return Atomix.builder(configFile.absolutePath).build()
 
  44     fun defaultMulticastAtomix(
 
  45         clusterInfo: ClusterInfo,
 
  46         raftPartitions: Int = 1,
 
  47         primaryBackupPartitions: Int = 32
 
  50         val nodeId = clusterInfo.nodeId
 
  52         val raftPartitionGroup = RaftPartitionGroup.builder("system")
 
  53             .withNumPartitions(raftPartitions)
 
  54             .withMembers(clusterInfo.clusterMembers)
 
  55             .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
 
  58         val primaryBackupGroup =
 
  59             PrimaryBackupPartitionGroup.builder("data")
 
  60                 .withNumPartitions(primaryBackupPartitions)
 
  63         return Atomix.builder()
 
  65             .withAddress(Address.from(clusterInfo.nodeAddress))
 
  66             .withManagementGroup(raftPartitionGroup)
 
  67             .withPartitionGroups(primaryBackupGroup)
 
  68             .withMulticastEnabled()
 
  72     fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
 
  73         check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
 
  75         val protocol = MultiPrimaryProtocol.builder()
 
  76             .withBackups(numBackups)
 
  79         return atomix.mapBuilder<String, T>(storeName)
 
  80             .withProtocol(protocol)
 
  82             .withValueType(JsonNode::class.java)
 
  84                 JsonNode::class.java, TextNode::class.java, ObjectNode::class.java,
 
  85                 ArrayNode::class.java, NullNode::class.java, MissingNode::class.java
 
  90     fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
 
  91         check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
 
  93         val protocol = MultiPrimaryProtocol.builder()
 
  94             .withBackups(numBackups)
 
  97         val lock = atomix.lockBuilder(lockName)
 
  98             .withProtocol(protocol)