Cluster distributed lock service.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / atomix-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / atomix / AtomixBluePrintClusterServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import io.atomix.core.Atomix
21 import kotlinx.coroutines.Dispatchers
22 import kotlinx.coroutines.async
23 import kotlinx.coroutines.awaitAll
24 import kotlinx.coroutines.delay
25 import kotlinx.coroutines.runBlocking
26 import kotlinx.coroutines.withContext
27 import org.junit.Before
28 import org.junit.Test
29 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
30 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
32 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
33 import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
34 import org.onap.ccsdk.cds.controllerblueprints.core.logger
35 import kotlin.test.assertNotNull
36 import kotlin.test.assertTrue
37
38 class AtomixBluePrintClusterServiceTest {
39     val log = logger(AtomixBluePrintClusterServiceTest::class)
40
41     @Before
42     fun init() {
43         runBlocking {
44             deleteNBDir("target/cluster")
45         }
46     }
47
48     /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/
49     @Test
50     fun testClusterJoin() {
51         runBlocking {
52             val members = arrayListOf("node-5679", "node-5680")
53             val deferred = arrayListOf(5679, 5680).map { port ->
54                 async(Dispatchers.IO) {
55                     val nodeId = "node-$port"
56                     log.info("********** Starting node($nodeId) on port($port)")
57                     val clusterInfo = ClusterInfo(
58                         id = "test-cluster", nodeId = nodeId,
59                         clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
60                     )
61                     val atomixClusterService = AtomixBluePrintClusterService()
62                     atomixClusterService.startCluster(clusterInfo)
63                     atomixClusterService.atomix
64                 }
65             }
66             val atomixs = deferred.awaitAll()
67             testDistributedStore(atomixs)
68             testDistributedLock(atomixs)
69         }
70     }
71
72     private suspend fun testDistributedStore(atomix: List<Atomix>) {
73         /** Test Distributed store creation */
74         repeat(2) { storeId ->
75             val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
76             assertNotNull(store, "failed to get store")
77             val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
78             store1.addListener {
79                 log.info("Received map event : $it")
80             }
81             repeat(10) {
82                 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
83             }
84             delay(100)
85             store.close()
86         }
87     }
88
89     private suspend fun testDistributedLock(atomix: List<Atomix>) {
90         val lockName = "sample-lock"
91         withContext(Dispatchers.IO) {
92             val deferred = async {
93                 executeLock(atomix.get(0), "first", lockName)
94             }
95             val deferred2 = async {
96                 executeLock(atomix.get(1), "second", lockName)
97             }
98             val deferred3 = async {
99                 executeLock(atomix.get(1), "third", lockName)
100             }
101             deferred.start()
102             deferred2.start()
103             deferred3.start()
104         }
105     }
106
107     private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
108         log.info("initialising $lockId lock...")
109         val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
110         assertNotNull(distributedLock, "failed to create distributed $lockId lock")
111         distributedLock.lock()
112         assertTrue(distributedLock.isLocked, "failed to lock $lockId")
113         try {
114             log.info("locked $lockId process for 5mSec")
115             delay(5)
116         } finally {
117             distributedLock.unlock()
118             log.info("$lockId lock released")
119         }
120         distributedLock.close()
121     }
122 }