Include correlationId in group lock. 68/100068/1
authorBrinda Santh <bs2796@att.com>
Tue, 7 Jan 2020 19:56:53 +0000 (14:56 -0500)
committerBrinda Santh <bs2796@att.com>
Tue, 7 Jan 2020 19:59:07 +0000 (14:59 -0500)
Message prioritization optimization by checking and including  correlation Id, so that non related correlation message won't  get locked.

Optimized  Atomix Junit Test cases.

Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I090ed4c193c7f9af4014cfeee4c6208c12b542c1

ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt

index a37089f..020038c 100644 (file)
@@ -9,7 +9,9 @@ services:
     ports:
       - "3306:3306"
     volumes:
-      - ~/vm_mysql:/var/lib/mysql
+      - target: /var/lib/mysql
+        type: volume
+        source: mysql-data
     restart: always
     environment:
       MYSQL_ROOT_PASSWORD: sdnctl
@@ -45,8 +47,8 @@ services:
         type: volume
         source: blueprints-deploy
       - target: /opt/app/onap/config
-        type: bind
-        source: ./config
+        type: volume
+        source: controller-config
     environment:
       # Same as hostname and container name
       CLUSTER_ENABLED: "true"
@@ -79,8 +81,8 @@ services:
         type: volume
         source: blueprints-deploy
       - target: /opt/app/onap/config
-        type: bind
-        source: ./config
+        type: volume
+        source: resource-resolution-config
     environment:
       CLUSTER_ENABLED: "true"
       CLUSTER_ID: cds-cluster
@@ -94,8 +96,60 @@ services:
       APP_CONFIG_HOME: /opt/app/onap/config
       STICKYSELECTORKEY:
       ENVCONTEXT: dev
+  py-executor-0:
+    depends_on:
+      - db
+      - nats
+    image: onap/ccsdk-py-executor
+    container_name: py-executor-0
+    hostname: py-executor-0
+    networks:
+      - cds-network
+    ports:
+      - "50052:50052"
+    restart: always
+    volumes:
+      - target: /opt/app/onap/blueprints/deploy
+        type: volume
+        source: blueprints-deploy
+    environment:
+      CLUSTER_ID: cds-cluster
+      CLUSTER_NODE_ID: py-executor-0
+      CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0
+      NATS_HOSTS: nats://nats:4222
+      APPLICATIONNAME: py-executor
+      BUNDLEVERSION: 1.0.0
+      APP_CONFIG_HOME: /opt/app/onap/config
+      STICKYSELECTORKEY:
+      ENVCONTEXT: dev
+      APP_PORT: 50052
+      AUTH_TYPE: tls-auth
+      LOG_FILE: /opt/app/onap/logs/application.log
 volumes:
+  mysql-data:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/mysql/data
+      o: bind
   blueprints-deploy:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/blueprints/deploy
+      o: bind
+  controller-config:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/cds-controller/config
+      o: bind
+  resource-resolution-config:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/resource-resolution/config
+      o: bind
 
 networks:
   cds-network:
index d877702..20b17bc 100755 (executable)
@@ -9,7 +9,9 @@ services:
     ports:
       - "3306:3306"
     volumes:
-      - ~/vm_mysql:/var/lib/mysql
+      - target: /var/lib/mysql
+        type: volume
+        source: mysql-data
     restart: always
     environment:
       MYSQL_ROOT_PASSWORD: sdnctl
@@ -29,7 +31,12 @@ services:
       - "9111:9111"
     restart: always
     volumes:
-      - blueprints-deploy:/opt/app/onap/blueprints/deploy
+      - target: /opt/app/onap/blueprints/deploy
+        type: volume
+        source: blueprints-deploy
+      - target: /opt/app/onap/config
+        type: volume
+        source: controller-config
     environment:
       APPLICATIONNAME: cds-controller
       BUNDLEVERSION: 1.0.0
@@ -47,7 +54,9 @@ services:
       - "50051:50051"
     restart: always
     volumes:
-      - blueprints-deploy:/opt/app/onap/blueprints/deploy
+      - target: /opt/app/onap/blueprints/deploy
+        type: volume
+        source: blueprints-deploy
   py-executor-default:
     depends_on:
       - db
@@ -60,7 +69,9 @@ services:
       - "50052:50052"
     restart: always
     volumes:
-      - blueprints-deploy:/opt/app/onap/blueprints/deploy
+      - target: /opt/app/onap/blueprints/deploy
+        type: volume
+        source: blueprints-deploy
     environment:
       APPLICATIONNAME: py-executor
       BUNDLEVERSION: 1.0.0
@@ -72,7 +83,24 @@ services:
       LOG_FILE: /opt/app/onap/logs/application.log
 
 volumes:
+  mysql-data:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/mysql/data
+      o: bind
   blueprints-deploy:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/blueprints/deploy
+      o: bind
+  controller-config:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/cds-controller/config
+      o: bind
 
 networks:
   cds-network:
index 39d0814..bef7a7b 100644 (file)
@@ -36,14 +36,18 @@ fun AbstractComponentFunction.messagePrioritizationStateService() =
 /**
  * MessagePrioritization correlation extensions
  */
+
+/**
+ * Arrange comma separated correlation keys in ascending order.
+ */
 fun MessagePrioritization.toFormatedCorrelation(): String {
-    val ascendingKey = this.correlationId!!.split(",")
+    return this.correlationId!!.split(",")
         .map { it.trim() }.sorted().joinToString(",")
-    return ascendingKey
 }
 
+/**
+ * Used to group the correlation with respect to types.
+ */
 fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
-    val ascendingKey = this.correlationId!!.split(",")
-        .map { it.trim() }.sorted().joinToString(",")
-    return TypeCorrelationKey(this.type, ascendingKey)
+    return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
 }
index d1f38f4..49230b6 100644 (file)
@@ -22,6 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
@@ -32,8 +33,12 @@ object MessageProcessorUtils {
         clusterService: BluePrintClusterService?,
         messagePrioritization: MessagePrioritization
     ): ClusterLock? {
-        return if (clusterService != null && clusterService.clusterJoined()) {
-            val lockName = "prioritization-${messagePrioritization.group}"
+        return if (clusterService != null && clusterService.clusterJoined() &&
+            !messagePrioritization.correlationId.isNullOrBlank()
+        ) {
+            // Get the correlation key in ascending order, even it it is misplaced
+            val correlationId = messagePrioritization.toFormatedCorrelation()
+            val lockName = "prioritization-${messagePrioritization.group}-$correlationId"
             val clusterLock = clusterService.clusterLock(lockName)
             clusterLock.lock()
             if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
index a2a0d39..9be15f2 100644 (file)
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
 import com.fasterxml.jackson.databind.node.NullNode
 import com.fasterxml.jackson.databind.node.ObjectNode
 import io.atomix.core.Atomix
+import io.atomix.core.lock.AtomicLock
 import io.atomix.core.lock.DistributedLock
 import io.atomix.core.map.DistributedMap
 import io.atomix.protocols.backup.MultiPrimaryProtocol
@@ -90,13 +91,24 @@ object AtomixLibUtils {
     fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
         check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
 
+        val protocol = MultiPrimaryProtocol.builder()
+            .withBackups(numBackups)
+            .build()
+        return atomix.lockBuilder(lockName)
+            .withProtocol(protocol)
+            .build()
+    }
+
+    /** get Atomic distributed lock, to get lock fence information */
+    fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
+        check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
+
         val protocol = MultiPrimaryProtocol.builder()
             .withBackups(numBackups)
             .build()
 
-        val lock = atomix.lockBuilder(lockName)
+        return atomix.atomicLockBuilder(lockName)
             .withProtocol(protocol)
             .build()
-        return lock
     }
 }
index 39453fc..67bf4ca 100644 (file)
@@ -35,7 +35,7 @@ import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
 
 class AtomixBluePrintClusterServiceTest {
-    val log = logger(AtomixBluePrintClusterServiceTest::class)
+    private val log = logger(AtomixBluePrintClusterServiceTest::class)
 
     @Before
     fun init() {
@@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest {
     @Test
     fun testClusterJoin() {
         runBlocking {
-            val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
-            // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
-            val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+            val bluePrintClusterServiceOne =
+                createCluster(arrayListOf(5679, 5680)).toMutableList()
+            // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
+            // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+            val bluePrintClusterService = bluePrintClusterServiceOne[0]
             log.info("Members : ${bluePrintClusterService.allMembers()}")
             log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
             log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
@@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest {
         }
     }
 
-    private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+    private suspend fun createCluster(
+        ports: List<Int>,
+        otherClusterPorts: List<Int>? = null
+    ): List<BluePrintClusterService> {
+
         return withContext(Dispatchers.Default) {
-            val members = ports.map { "node-$it" }
+            val clusterMembers = ports.map { "node-$it" }.toMutableList()
+            /** Add the other cluster as members */
+            if (!otherClusterPorts.isNullOrEmpty()) {
+                val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
+                clusterMembers.addAll(otherClusterMembers)
+            }
             val deferred = ports.map { port ->
                 async(Dispatchers.IO) {
                     val nodeId = "node-$port"
                     log.info("********** Starting node($nodeId) on port($port)")
                     val clusterInfo = ClusterInfo(
                         id = "test-cluster", nodeId = nodeId,
-                        clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+                        clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
                     )
                     val atomixClusterService = AtomixBluePrintClusterService()
                     atomixClusterService.startCluster(clusterInfo)
@@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest {
     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
         /** Test Distributed store creation */
         repeat(2) { storeId ->
-            val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+            val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
                 "blueprint-runtime-$storeId"
             ).toDistributedMap()
             assertNotNull(store, "failed to get store")
-            val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+            val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
                 "blueprint-runtime-$storeId"
             ).toDistributedMap()
 
@@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest {
         val lockName = "sample-lock"
         withContext(Dispatchers.IO) {
             val deferred = async {
-                executeLock(bluePrintClusterServices.get(0), "first", lockName)
+                executeLock(bluePrintClusterServices[0], "first", lockName)
             }
             val deferred2 = async {
-                executeLock(bluePrintClusterServices.get(0), "second", lockName)
+                executeLock(bluePrintClusterServices[0], "second", lockName)
             }
             val deferred3 = async {
-                executeLock(bluePrintClusterServices.get(0), "third", lockName)
+                executeLock(bluePrintClusterServices[1], "third", lockName)
             }
             deferred.start()
             deferred2.start()