Included and fixed clustered env properties and utils.
Fixed docker compose instance sequence numbers.
Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Ie28935ae7cb3de8c77cd7110993304eb49799b6c
image: nats-streaming:latest
container_name: nats
hostname: nats
- command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-1 --cluster_node_id nats-1"
+ command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-0 --cluster_node_id nats-0"
networks:
- cds-network
ports:
- "8222:8222"
- "4222:4222"
restart: always
- cds-controller-1:
+ cds-controller-0:
depends_on:
- db
- nats
image: onap/ccsdk-blueprintsprocessor:latest
- container_name: cds-controller-1
- hostname: cds-controller-1
+ container_name: cds-controller-0
+ hostname: cds-controller-0
networks:
- cds-network
ports:
source: ./config
environment:
# Same as hostname and container name
+ CLUSTER_ENABLED: "true"
CLUSTER_ID: cds-cluster
- CLUSTER_NODE_ID: cds-controller-1
- CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+ CLUSTER_NODE_ID: cds-controller-0
+ CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
NATS_HOSTS: nats://nats:4222
APP_CONFIG_HOME: /opt/app/onap/config
STICKYSELECTORKEY:
ENVCONTEXT: dev
- resource-resolution-1:
+ resource-resolution-0:
depends_on:
- db
- nats
image: onap/ccsdk-blueprintsprocessor:latest
- container_name: resource-resolution-1
- hostname: resource-resolution-1
+ container_name: resource-resolution-0
+ hostname: resource-resolution-0
networks:
- cds-network
ports:
type: bind
source: ./config
environment:
+ CLUSTER_ENABLED: "true"
CLUSTER_ID: cds-cluster
- CLUSTER_NODE_ID: resource-resolution-1
- CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+ CLUSTER_NODE_ID: resource-resolution-0
+ CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
NATS_HOSTS: nats://nats:4222
-Djava.security.egd=file:/dev/./urandom \
-DAPPNAME=${APPLICATIONNAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
-Dspring.config.location=${APP_CONFIG_HOME}/ \
--DCLUSTER_ID=${CLUSTER_ID} \
--DCLUSTER_NODE_ID=${CLUSTER_NODE_ID} \
--DCLUSTER_NODE_ADDRESS=${CLUSTER_NODE_ID} \
--DCLUSTER_MEMBERS=${CLUSTER_MEMBERS} \
--DCLUSTER_STORAGE_PATH=${CLUSTER_STORAGE_PATH} \
--DCLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} \
org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component
* CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
* CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
* 4. Cluster will be enabled only all the above properties present in the environments.
- * if CLUSTER_ID is present, then it will try to create cluster.
+ * if CLUSTER_ENABLED is present, then it will try to create cluster.
*/
@Component
open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePrintClusterService) {
@EventListener(ApplicationReadyEvent::class)
fun startAndJoinCluster() = runBlocking {
- val clusterId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID)
- if (!clusterId.isNullOrEmpty()) {
+ if (BluePrintConstants.CLUSTER_ENABLED) {
- val nodeId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
- ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ID}")
+ val clusterId = ClusterUtils.clusterId()
+ val nodeId = ClusterUtils.clusterNodeId()
+ val nodeAddress = ClusterUtils.clusterNodeAddress()
- val nodeAddress = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
- ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS}")
-
- val clusterMembers = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
+ val clusterMembers = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_MEMBERS}")
- val clusterMemberList = clusterMembers.split(",").map { it.trim() }.toList()
+ val clusterMemberList = clusterMembers.splitCommaAsList()
- val clusterStorage = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
+ val clusterStorage = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH}")
- val clusterConfigFile = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
+ val clusterConfigFile = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
val clusterInfo = ClusterInfo(
id = clusterId, nodeId = nodeId,
)
bluePrintClusterService.startCluster(clusterInfo)
} else {
- log.info(
- "Cluster is disabled, to enable cluster set the environment " +
- "properties[CLUSTER_ID,CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, CLUSTER_MEMBERS,CLUSTER_CONFIG_FILE]"
- )
+ log.info("Cluster is disabled, to enable cluster set the environment CLUSTER_* properties.")
}
}
val APP_NAME = System.getProperty("APPLICATION_NAME")
?: System.getProperty("APP_NAME")
?: System.getProperty("APPNAME")
- ?: "cds-controller-default"
+ ?: "cds-controller"
const val DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean()
/** Cluster Properties */
+ val CLUSTER_ENABLED = (System.getenv("CLUSTER_ENABLED") ?: "false").toBoolean()
const val PROPERTY_CLUSTER_ID = "CLUSTER_ID"
const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID"
const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS"
}
fun clusterId(): String {
- return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
+ return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
}
fun clusterNodeId(): String {
- return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller"
+ return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller-0"
+ }
+
+ fun clusterNodeAddress(): String {
+ return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
+ ?: clusterNodeId()
}
}
import io.atomix.cluster.ClusterMembershipEvent
import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
import kotlinx.coroutines.delay
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.stereotype.Service
lateinit var atomix: Atomix
- private var joined = false
-
override suspend fun startCluster(clusterInfo: ClusterInfo) {
log.info(
"Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
/** Listen for the member chaneg events */
atomix.membershipService.addListener { membershipEvent ->
when (membershipEvent.type()) {
- ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
- ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
- ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
- else -> log.info("***** Member event unknown")
+ ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
+ else -> log.info("Member event unknown")
}
}
atomix.start().join()
"ping",
"ping from node(${clusterInfo.nodeId})"
)
- joined = true
}
override fun clusterJoined(): Boolean {
- return joined
+ return atomix.isRunning
}
override suspend fun allMembers(): Set<ClusterMember> {
check(::atomix.isInitialized) { "failed to start and join cluster" }
check(atomix.isRunning) { "cluster is not running" }
+
return atomix.membershipService.members.map {
ClusterMember(
id = it.id().id(),
}
override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
return AtomixLibUtils.distributedMapStore<T>(atomix, name)
}
+ /** The DistributedLock is a distributed implementation of Java’s Lock.
+ * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
+ * determine ordering of multiple concurrent lock holders.
+ * DistributedLocks are designed to account for failures within the cluster.
+ * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
+ * the lock will be released and granted to the next waiting process. *
+ */
+ override suspend fun clusterLock(name: String): ClusterLock {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ return ClusterLockImpl(atomix, name)
+ }
+
override suspend fun shutDown(duration: Duration) {
- val shutDownMilli = duration.toMillis()
- log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
- delay(shutDownMilli)
- atomix.stop()
+ if (::atomix.isInitialized) {
+ val shutDownMilli = duration.toMillis()
+ log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+ delay(shutDownMilli)
+ atomix.stop()
+ }
+ }
+}
+
+open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
+
+ lateinit var distributedLock: DistributedLock
+
+ override suspend fun lock() {
+ distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+ distributedLock.lock()
+ }
+
+ override suspend fun tryLock(timeout: Long): Boolean {
+ distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+ return distributedLock.tryLock(Duration.ofMillis(timeout))
+ }
+
+ override suspend fun unLock() {
+ distributedLock.unlock()
+ }
+
+ override fun isLocked(): Boolean {
+ return distributedLock.isLocked
}
}
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
import io.atomix.utils.net.Address
import org.jsoup.nodes.TextNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
object AtomixLibUtils {
+ private val log = logger(AtomixLibUtils::class)
fun configAtomix(filePath: String): Atomix {
val configFile = normalizedFile(filePath)
return Atomix.builder(configFile.absolutePath).build()
}
- fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix {
+ fun defaultMulticastAtomix(
+ clusterInfo: ClusterInfo,
+ raftPartitions: Int = 1,
+ primaryBackupPartitions: Int = 32
+ ): Atomix {
val nodeId = clusterInfo.nodeId
val raftPartitionGroup = RaftPartitionGroup.builder("system")
- .withNumPartitions(7)
+ .withNumPartitions(raftPartitions)
.withMembers(clusterInfo.clusterMembers)
.withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
.build()
val primaryBackupGroup =
PrimaryBackupPartitionGroup.builder("data")
- .withNumPartitions(31)
+ .withNumPartitions(primaryBackupPartitions)
.build()
return Atomix.builder()
.build()
}
- fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> {
+ fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
val protocol = MultiPrimaryProtocol.builder()
- .withBackups(2)
+ .withBackups(numBackups)
.build()
return atomix.mapBuilder<String, T>(storeName)
)
.build()
}
+
+ fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
+
+ val lock = atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ return lock
+ }
}
package org.onap.ccsdk.cds.blueprintsprocessor.atomix
import com.fasterxml.jackson.databind.JsonNode
+import io.atomix.core.Atomix
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
import org.junit.Before
import org.junit.Test
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
val log = logger(AtomixBluePrintClusterServiceTest::class)
atomixClusterService.atomix
}
}
- val atomix = deferred.awaitAll()
- /** Test Distributed store creation */
- repeat(2) { storeId ->
- val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
- assertNotNull(store, "failed to get store")
- val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
- store1.addListener {
- log.info("Received map event : $it")
- }
- repeat(10) {
- store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
- }
- delay(100)
- store.close()
+ val atomixs = deferred.awaitAll()
+ testDistributedStore(atomixs)
+ testDistributedLock(atomixs)
+ }
+ }
+
+ private suspend fun testDistributedStore(atomix: List<Atomix>) {
+ /** Test Distributed store creation */
+ repeat(2) { storeId ->
+ val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+ assertNotNull(store, "failed to get store")
+ val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+ store1.addListener {
+ log.info("Received map event : $it")
+ }
+ repeat(10) {
+ store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
}
+ delay(100)
+ store.close()
+ }
+ }
+
+ private suspend fun testDistributedLock(atomix: List<Atomix>) {
+ val lockName = "sample-lock"
+ withContext(Dispatchers.IO) {
+ val deferred = async {
+ executeLock(atomix.get(0), "first", lockName)
+ }
+ val deferred2 = async {
+ executeLock(atomix.get(1), "second", lockName)
+ }
+ val deferred3 = async {
+ executeLock(atomix.get(1), "third", lockName)
+ }
+ deferred.start()
+ deferred2.start()
+ deferred3.start()
+ }
+ }
+
+ private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+ log.info("initialising $lockId lock...")
+ val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+ assertNotNull(distributedLock, "failed to create distributed $lockId lock")
+ distributedLock.lock()
+ assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+ try {
+ log.info("locked $lockId process for 5mSec")
+ delay(5)
+ } finally {
+ distributedLock.unlock()
+ log.info("$lockId lock released")
}
+ distributedLock.close()
}
}
import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
import kotlin.test.assertNotNull
testMultiPublish(natsService)
testLoadBalance(natsService)
+ testLimitSubscription(natsService)
testRequestReply(natsService)
testMultiRequestReply(natsService)
delay(1000)
val lbMessageHandler2 =
MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+ val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+ val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
repeat(5) {
natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
}
+ sub1.unsubscribe()
+ sub2.unsubscribe()
+ }
+ }
+
+ private fun testLimitSubscription(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Load balance Publish Message Test **/
+ val lbMessageHandler1 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 1: ${message.strData()}")
+ message.ack()
+ }
+ }
+ val lbMessageHandler2 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 2: ${message.strData()}")
+ message.ack()
+ }
+ }
+
+ val sub1 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler1,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+ val sub2 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler2,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+
+ repeat(10) {
+ natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
+ }
+ sub1.unsubscribe()
+ sub2.unsubscribe()
}
}
/** Create and get or get the distributed data map store with [name] */
suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
+ /** Create and get the distributed lock with [name] */
+ suspend fun clusterLock(name: String): ClusterLock
+
/** Shut down the cluster with [duration] */
suspend fun shutDown(duration: Duration)
}
var storagePath: String
)
-data class ClusterMember(val id: String, val memberAddress: String?)
+data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null)
+
+interface ClusterLock {
+ suspend fun lock()
+ suspend fun tryLock(timeout: Long): Boolean
+ suspend fun unLock()
+ fun isLocked(): Boolean
+}