2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.atomix
19 import com.fasterxml.jackson.databind.JsonNode
20 import kotlinx.coroutines.Dispatchers
21 import kotlinx.coroutines.async
22 import kotlinx.coroutines.awaitAll
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.runBlocking
25 import kotlinx.coroutines.withContext
26 import org.junit.Before
28 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
31 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
32 import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import kotlin.test.assertNotNull
35 import kotlin.test.assertTrue
37 class AtomixBluePrintClusterServiceTest {
38 val log = logger(AtomixBluePrintClusterServiceTest::class)
43 deleteNBDir("target/cluster")
47 /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/
49 fun testClusterJoin() {
51 val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
52 // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
53 val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
54 log.info("Members : ${bluePrintClusterService.allMembers()}")
55 log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
56 log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
57 testDistributedStore(bluePrintClusterServiceOne)
58 testDistributedLock(bluePrintClusterServiceOne)
62 private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
63 return withContext(Dispatchers.Default) {
64 val members = ports.map { "node-$it" }
65 val deferred = ports.map { port ->
66 async(Dispatchers.IO) {
67 val nodeId = "node-$port"
68 log.info("********** Starting node($nodeId) on port($port)")
69 val clusterInfo = ClusterInfo(
70 id = "test-cluster", nodeId = nodeId,
71 clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
73 val atomixClusterService = AtomixBluePrintClusterService()
74 atomixClusterService.startCluster(clusterInfo)
82 private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
83 /** Test Distributed store creation */
84 repeat(2) { storeId ->
85 val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
86 "blueprint-runtime-$storeId"
88 assertNotNull(store, "failed to get store")
89 val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
90 "blueprint-runtime-$storeId"
94 log.info("Received map event : $it")
97 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
104 private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
105 val lockName = "sample-lock"
106 withContext(Dispatchers.IO) {
107 val deferred = async {
108 executeLock(bluePrintClusterServices.get(0), "first", lockName)
110 val deferred2 = async {
111 executeLock(bluePrintClusterServices.get(0), "second", lockName)
113 val deferred3 = async {
114 executeLock(bluePrintClusterServices.get(0), "third", lockName)
122 private suspend fun executeLock(
123 bluePrintClusterService: BluePrintClusterService,
127 log.info("initialising $lockId lock...")
128 val distributedLock = bluePrintClusterService.clusterLock(lockName)
129 assertNotNull(distributedLock, "failed to create distributed $lockId lock")
130 distributedLock.lock()
131 assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
133 log.info("locked $lockId process for 5mSec")
136 distributedLock.unLock()
137 log.info("$lockId lock released")
139 distributedLock.close()