Merge "Reorder Create Tabs"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / atomix-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / atomix / AtomixBluePrintClusterServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import kotlinx.coroutines.Dispatchers
21 import kotlinx.coroutines.async
22 import kotlinx.coroutines.awaitAll
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.runBlocking
25 import kotlinx.coroutines.withContext
26 import org.junit.Before
27 import org.junit.Test
28 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
31 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
32 import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import kotlin.test.assertNotNull
35 import kotlin.test.assertTrue
36
37 class AtomixBluePrintClusterServiceTest {
38     private val log = logger(AtomixBluePrintClusterServiceTest::class)
39
40     @Before
41     fun init() {
42         runBlocking {
43             deleteNBDir("target/cluster")
44         }
45     }
46
47     /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/
48     @Test
49     fun testClusterJoin() {
50         runBlocking {
51             val bluePrintClusterServiceOne =
52                 createCluster(arrayListOf(5679, 5680)).toMutableList()
53             // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
54             // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
55             val bluePrintClusterService = bluePrintClusterServiceOne[0]
56             log.info("Members : ${bluePrintClusterService.allMembers()}")
57             log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
58             log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
59             testDistributedStore(bluePrintClusterServiceOne)
60             testDistributedLock(bluePrintClusterServiceOne)
61         }
62     }
63
64     private suspend fun createCluster(
65         ports: List<Int>,
66         otherClusterPorts: List<Int>? = null
67     ): List<BluePrintClusterService> {
68
69         return withContext(Dispatchers.Default) {
70             val clusterMembers = ports.map { "node-$it" }.toMutableList()
71             /** Add the other cluster as members */
72             if (!otherClusterPorts.isNullOrEmpty()) {
73                 val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
74                 clusterMembers.addAll(otherClusterMembers)
75             }
76             val deferred = ports.map { port ->
77                 async(Dispatchers.IO) {
78                     val nodeId = "node-$port"
79                     log.info("********** Starting node($nodeId) on port($port)")
80                     val clusterInfo = ClusterInfo(
81                         id = "test-cluster", nodeId = nodeId,
82                         clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
83                     )
84                     val atomixClusterService = AtomixBluePrintClusterService()
85                     atomixClusterService.startCluster(clusterInfo)
86                     atomixClusterService
87                 }
88             }
89             deferred.awaitAll()
90         }
91     }
92
93     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
94         /** Test Distributed store creation */
95         repeat(2) { storeId ->
96             val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
97                 "blueprint-runtime-$storeId"
98             ).toDistributedMap()
99             assertNotNull(store, "failed to get store")
100             val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
101                 "blueprint-runtime-$storeId"
102             ).toDistributedMap()
103
104             store1.addListener {
105                 log.info("Received map event : $it")
106             }
107             repeat(5) {
108                 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
109             }
110             delay(10)
111             store.close()
112         }
113     }
114
115     private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
116         val lockName = "sample-lock"
117         withContext(Dispatchers.IO) {
118             val deferred = async {
119                 executeLock(bluePrintClusterServices[0], "first", lockName)
120             }
121             val deferred2 = async {
122                 executeLock(bluePrintClusterServices[0], "second", lockName)
123             }
124             val deferred3 = async {
125                 executeLock(bluePrintClusterServices[1], "third", lockName)
126             }
127             deferred.start()
128             deferred2.start()
129             deferred3.start()
130         }
131     }
132
133     private suspend fun executeLock(
134         bluePrintClusterService: BluePrintClusterService,
135         lockId: String,
136         lockName: String
137     ) {
138         log.info("initialising $lockId lock...")
139         val distributedLock = bluePrintClusterService.clusterLock(lockName)
140         assertNotNull(distributedLock, "failed to create distributed $lockId lock")
141         distributedLock.lock()
142         assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
143         try {
144             log.info("locked $lockId process for 5mSec")
145             delay(5)
146         } finally {
147             distributedLock.unLock()
148             log.info("$lockId lock released")
149         }
150         distributedLock.close()
151     }
152 }