Cluster partition master API 22/99922/2
authorBrinda Santh <bs2796@att.com>
Thu, 26 Dec 2019 20:15:27 +0000 (15:15 -0500)
committerKAPIL SINGAL <ks220y@att.com>
Mon, 30 Dec 2019 17:49:04 +0000 (17:49 +0000)
Added cluster lock close method.

Added NATS connection name and default services.

Junit test cases improvements.

Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I44c7c1aaeae2ddbf768903152fb00bc160172fc1

ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt

index 696d728..17d2436 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.atomix
 
+import com.fasterxml.jackson.databind.JsonNode
 import io.atomix.core.map.DistributedMap
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 
-fun <T : Map<*, *>> T.toDistributedMap(): DistributedMap<*, *> {
-    return if (this != null && this is DistributedMap<*, *>) this
+fun <T : Map<String, JsonNode>> T.toDistributedMap(): DistributedMap<String, JsonNode> {
+    return if (this != null && this is DistributedMap<*, *>) this as DistributedMap<String, JsonNode>
     else throw BluePrintProcessorException("map is not of type DistributedMap")
 }
index b5ec6dd..0690eb8 100644 (file)
@@ -55,10 +55,12 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
             when (membershipEvent.type()) {
                 ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
                 ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+                ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}")
                 ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
                 else -> log.info("Member event unknown")
             }
         }
+        /** Start and Join the Cluster */
         atomix.start().join()
         log.info(
             "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
@@ -83,6 +85,19 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
         return atomix.isRunning
     }
 
+    override suspend fun masterMember(partitionGroup: String): ClusterMember {
+        check(::atomix.isInitialized) { "failed to start and join cluster" }
+        check(atomix.isRunning) { "cluster is not running" }
+        val masterId = atomix.partitionService
+            .getPartitionGroup(partitionGroup)
+            .getPartition("1").primary()
+        val masterMember = atomix.membershipService.getMember(masterId)
+        return ClusterMember(
+            id = masterMember.id().id(),
+            memberAddress = masterMember.address().toString()
+        )
+    }
+
     override suspend fun allMembers(): Set<ClusterMember> {
         check(::atomix.isInitialized) { "failed to start and join cluster" }
         check(atomix.isRunning) { "cluster is not running" }
@@ -90,7 +105,7 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
         return atomix.membershipService.members.map {
             ClusterMember(
                 id = it.id().id(),
-                memberAddress = it.host()
+                memberAddress = it.address().toString()
             )
         }.toSet()
     }
@@ -153,4 +168,10 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String)
     override fun isLocked(): Boolean {
         return distributedLock.isLocked
     }
+
+    override fun close() {
+        if (::distributedLock.isInitialized) {
+            distributedLock.close()
+        }
+    }
 }
index b257069..39453fc 100644 (file)
@@ -17,7 +17,6 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.atomix
 
 import com.fasterxml.jackson.databind.JsonNode
-import io.atomix.core.Atomix
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
@@ -27,7 +26,7 @@ import kotlinx.coroutines.withContext
 import org.junit.Before
 import org.junit.Test
 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
@@ -45,12 +44,25 @@ class AtomixBluePrintClusterServiceTest {
         }
     }
 
-    /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/
+    /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/
     @Test
     fun testClusterJoin() {
         runBlocking {
-            val members = arrayListOf("node-5679", "node-5680")
-            val deferred = arrayListOf(5679, 5680).map { port ->
+            val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
+            // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
+            val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+            log.info("Members : ${bluePrintClusterService.allMembers()}")
+            log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
+            log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
+            testDistributedStore(bluePrintClusterServiceOne)
+            testDistributedLock(bluePrintClusterServiceOne)
+        }
+    }
+
+    private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+        return withContext(Dispatchers.Default) {
+            val members = ports.map { "node-$it" }
+            val deferred = ports.map { port ->
                 async(Dispatchers.IO) {
                     val nodeId = "node-$port"
                     log.info("********** Starting node($nodeId) on port($port)")
@@ -60,43 +72,46 @@ class AtomixBluePrintClusterServiceTest {
                     )
                     val atomixClusterService = AtomixBluePrintClusterService()
                     atomixClusterService.startCluster(clusterInfo)
-                    atomixClusterService.atomix
+                    atomixClusterService
                 }
             }
-            val atomixs = deferred.awaitAll()
-            testDistributedStore(atomixs)
-            testDistributedLock(atomixs)
+            deferred.awaitAll()
         }
     }
 
-    private suspend fun testDistributedStore(atomix: List<Atomix>) {
+    private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
         /** Test Distributed store creation */
         repeat(2) { storeId ->
-            val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+            val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+                "blueprint-runtime-$storeId"
+            ).toDistributedMap()
             assertNotNull(store, "failed to get store")
-            val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+            val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+                "blueprint-runtime-$storeId"
+            ).toDistributedMap()
+
             store1.addListener {
                 log.info("Received map event : $it")
             }
-            repeat(10) {
+            repeat(5) {
                 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
             }
-            delay(100)
+            delay(10)
             store.close()
         }
     }
 
-    private suspend fun testDistributedLock(atomix: List<Atomix>) {
+    private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
         val lockName = "sample-lock"
         withContext(Dispatchers.IO) {
             val deferred = async {
-                executeLock(atomix.get(0), "first", lockName)
+                executeLock(bluePrintClusterServices.get(0), "first", lockName)
             }
             val deferred2 = async {
-                executeLock(atomix.get(1), "second", lockName)
+                executeLock(bluePrintClusterServices.get(0), "second", lockName)
             }
             val deferred3 = async {
-                executeLock(atomix.get(1), "third", lockName)
+                executeLock(bluePrintClusterServices.get(0), "third", lockName)
             }
             deferred.start()
             deferred2.start()
@@ -104,17 +119,21 @@ class AtomixBluePrintClusterServiceTest {
         }
     }
 
-    private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+    private suspend fun executeLock(
+        bluePrintClusterService: BluePrintClusterService,
+        lockId: String,
+        lockName: String
+    ) {
         log.info("initialising $lockId lock...")
-        val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+        val distributedLock = bluePrintClusterService.clusterLock(lockName)
         assertNotNull(distributedLock, "failed to create distributed $lockId lock")
         distributedLock.lock()
-        assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+        assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
         try {
             log.info("locked $lockId process for 5mSec")
             delay(5)
         } finally {
-            distributedLock.unlock()
+            distributedLock.unLock()
             log.info("$lockId lock released")
         }
         distributedLock.close()
index f1c625e..016d486 100644 (file)
@@ -19,7 +19,7 @@
         <!-- encoders are assigned the type
              ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
         <encoder>
-            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{100} - %msg%n</pattern>
         </encoder>
     </appender>
 
index 709ee7d..147d360 100644 (file)
@@ -17,6 +17,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.nats
 
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.springframework.context.annotation.ComponentScan
 import org.springframework.context.annotation.Configuration
@@ -31,9 +32,14 @@ open class BluePrintNatsLibConfiguration
 fun BluePrintDependencyService.natsLibPropertyService(): BluePrintNatsLibPropertyService =
     instance(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY)
 
+fun BluePrintDependencyService.controllerNatsService(): BluePrintNatsService {
+    return natsLibPropertyService().bluePrintNatsService(NatsLibConstants.DEFULT_NATS_SELECTOR)
+}
+
 class NatsLibConstants {
     companion object {
         const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service"
+        const val DEFULT_NATS_SELECTOR = "cds-controller"
         const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats."
         const val TYPE_TOKEN_AUTH = "token-auth"
         const val TYPE_TLS_AUTH = "tls-auth"
index 3329ec2..9767ac2 100644 (file)
@@ -23,6 +23,8 @@ open class NatsConnectionProperties {
     var clusterId: String = ClusterUtils.clusterId()
     var clientId: String = ClusterUtils.clusterNodeId()
     lateinit var host: String
+    /** Rest endpoint selector to access Monitoring API */
+    var monitoringSelector: String? = null
 }
 
 open class TokenAuthNatsConnectionProperties : NatsConnectionProperties() {
index 3781fae..00a972e 100644 (file)
@@ -34,6 +34,7 @@ open class TLSAuthNatsService(private val natsConnectionProperties: TLSAuthNatsC
             val serverList = natsConnectionProperties.host.splitCommaAsList()
 
             val options = Options.Builder()
+                .connectionName(natsConnectionProperties.clientId)
                 .servers(serverList.toTypedArray())
                 // .sslContext(sslContext())
                 .build()
index 0da3022..60b7934 100644 (file)
@@ -33,6 +33,7 @@ open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthN
             val serverList = natsConnectionProperties.host.splitCommaAsList()
 
             val options = Options.Builder()
+                .connectionName(natsConnectionProperties.clientId)
                 .servers(serverList.toTypedArray())
                 .token(natsConnectionProperties.token.toCharArray())
                 .build()
index 6fd4436..21fcfc5 100644 (file)
@@ -26,6 +26,9 @@ interface BluePrintClusterService {
 
     fun clusterJoined(): Boolean
 
+    /** Returns [partitionGroup] master member */
+    suspend fun masterMember(partitionGroup: String): ClusterMember
+
     /** Returns all the data cluster members */
     suspend fun allMembers(): Set<ClusterMember>
 
@@ -58,4 +61,5 @@ interface ClusterLock {
     suspend fun tryLock(timeout: Long): Boolean
     suspend fun unLock()
     fun isLocked(): Boolean
+    fun close()
 }