2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.map.IMap
24 import kotlinx.coroutines.Dispatchers
25 import kotlinx.coroutines.async
26 import kotlinx.coroutines.awaitAll
27 import kotlinx.coroutines.delay
28 import kotlinx.coroutines.runBlocking
29 import kotlinx.coroutines.withContext
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
32 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
33 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
34 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
35 import org.onap.ccsdk.cds.controllerblueprints.core.logger
36 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
37 import java.io.Serializable
38 import java.time.Duration
39 import java.util.Properties
40 import kotlin.test.assertEquals
41 import kotlin.test.assertNotNull
42 import kotlin.test.assertTrue
44 class HazlecastClusterServiceTest {
45 private val log = logger(HazlecastClusterServiceTest::class)
46 private val clusterSize = 3
49 fun testClientFileSystemYamlConfig() {
50 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
51 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
53 "hazelcast.client.config",
54 normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
56 val config = YamlClientConfigBuilder().build()
58 assertEquals("test-cluster", config.clusterName)
59 assertEquals("node-1234", config.instanceName)
63 fun testServerFileSystemYamlConfig() {
64 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
65 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
66 val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
67 val config = FileSystemYamlConfig(configFile)
69 assertEquals("test-cluster", config.clusterName)
70 assertEquals("node-1234", config.instanceName)
74 fun testClusterJoin() {
76 val bluePrintClusterServiceOne =
77 createCluster(arrayListOf(5679, 5680, 5681)).toMutableList()
79 // Join as Hazlecast Management Node
80 // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true)
81 // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false)
82 // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
83 printReachableMembers(bluePrintClusterServiceOne)
84 testDistributedStore(bluePrintClusterServiceOne)
85 testDistributedLock(bluePrintClusterServiceOne)
87 // executeScheduler(bluePrintClusterServiceOne[0])
90 shutdown(bluePrintClusterServiceOne)
94 private suspend fun createCluster(
96 joinAsClient: Boolean? = false
97 ): List<BluePrintClusterService> {
99 return withContext(Dispatchers.Default) {
100 val deferred = ports.map { port ->
101 async(Dispatchers.IO) {
102 val nodeId = "node-$port"
103 log.info("********** Starting node($nodeId) on port($port)")
104 val properties = Properties()
105 properties["hazelcast.logging.type"] = "slf4j"
107 if (joinAsClient!!) {
109 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
110 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
111 properties = properties
115 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
116 configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml",
117 properties = properties
120 val hazlecastClusterService = HazlecastClusterService()
121 hazlecastClusterService.startCluster(clusterInfo)
122 hazlecastClusterService
129 private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) {
130 bluePrintClusterServices.forEach { bluePrintClusterService ->
131 bluePrintClusterService.shutDown(Duration.ofMillis(10))
135 private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
136 /** Test Distributed store creation */
137 repeat(2) { storeId ->
138 val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
139 "blueprint-runtime-$storeId"
141 assertNotNull(store, "failed to get store")
143 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
146 val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
147 "blueprint-runtime-$storeId"
151 log.trace("Received map event : $it")
158 private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
159 val lockName = "sample-lock"
160 withContext(Dispatchers.IO) {
161 val deferred = async {
162 executeLock(bluePrintClusterServices[0], "first", lockName)
164 val deferred2 = async {
165 executeLock(bluePrintClusterServices[1], "second", lockName)
167 val deferred3 = async {
168 executeLock(bluePrintClusterServices[2], "third", lockName)
176 private suspend fun executeLock(
177 bluePrintClusterService: BluePrintClusterService,
181 log.info("initialising $lockId lock...")
182 val distributedLock = bluePrintClusterService.clusterLock(lockName)
183 assertNotNull(distributedLock, "failed to create distributed $lockId lock")
184 distributedLock.lock()
185 assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
187 log.info("locked $lockId process for 5mSec")
190 distributedLock.unLock()
191 log.info("$lockId lock released")
193 distributedLock.close()
196 private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
197 log.info("initialising ...")
198 val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
200 val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
201 assertEquals(3, memberNameMap.size, "failed to match member size")
202 memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
203 val scheduler = hazlecastClusterService.clusterScheduler("cleanup")
204 // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
205 // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
206 // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
207 // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
210 private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
211 bluePrintClusterServices.forEach { bluePrintClusterService ->
212 val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
213 val hazelcast = hazlecastClusterService.hazelcast
214 val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
215 val master = hazlecastClusterService.masterMember("system").memberAddress
216 val members = hazlecastClusterService.allMembers().map { it.memberAddress }
217 log.info("Cluster Members for($self): master($master) Members($members)")
220 val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56")
221 assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
222 log.info("Cluster applicationMembers ($applicationMembers)")
226 open class SampleSchedulerTask : Runnable, Serializable {
227 private val log = logger(SampleSchedulerTask::class)
229 log.info("I am scheduler action")