2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.instance.impl.HazelcastInstanceFactory
24 import com.hazelcast.map.IMap
25 import kotlinx.coroutines.Dispatchers
26 import kotlinx.coroutines.async
27 import kotlinx.coroutines.awaitAll
28 import kotlinx.coroutines.delay
29 import kotlinx.coroutines.newSingleThreadContext
30 import kotlinx.coroutines.runBlocking
31 import kotlinx.coroutines.withContext
32 import org.junit.After
33 import org.junit.Before
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
37 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
38 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
39 import org.onap.ccsdk.cds.controllerblueprints.core.logger
40 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
41 import java.io.Serializable
42 import java.util.Properties
43 import kotlin.test.assertEquals
44 import kotlin.test.assertNotNull
45 import kotlin.test.assertTrue
47 class HazelcastClusterServiceTest {
48 private val log = logger(HazelcastClusterServiceTest::class)
49 private val clusterSize = 3
53 fun killAllHazelcastInstances() {
54 HazelcastInstanceFactory.terminateAll()
58 fun testClientFileSystemYamlConfig() {
59 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
60 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
62 "hazelcast.client.config",
63 normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
65 val config = YamlClientConfigBuilder().build()
67 assertEquals("test-cluster", config.clusterName)
68 assertEquals("node-1234", config.instanceName)
72 fun testServerFileSystemYamlConfig() {
73 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
74 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
75 val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
76 val config = FileSystemYamlConfig(configFile)
78 assertEquals("test-cluster", config.clusterName)
79 assertEquals("node-1234", config.instanceName)
83 fun testClusterJoin() {
85 val bluePrintClusterServiceOne =
86 createCluster(arrayListOf(1, 2, 3)).toMutableList()
87 printReachableMembers(bluePrintClusterServiceOne)
88 testDistributedStore(bluePrintClusterServiceOne)
89 testDistributedLock(bluePrintClusterServiceOne)
93 private suspend fun createCluster(
95 joinAsClient: Boolean? = false
96 ): List<BluePrintClusterService> {
98 return withContext(Dispatchers.Default) {
99 val deferred = ids.map { id ->
100 async(Dispatchers.IO) {
101 val nodeId = "node-$id"
102 log.info("********** Starting ($nodeId)")
103 val properties = Properties()
104 properties["hazelcast.logging.type"] = "slf4j"
106 if (joinAsClient!!) {
108 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
109 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
110 properties = properties
114 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
115 configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml",
116 properties = properties
119 val hazelcastClusterService = HazelcastClusterService()
120 hazelcastClusterService.startCluster(clusterInfo)
121 hazelcastClusterService
128 private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
129 /** Test Distributed store creation */
130 repeat(2) { storeId ->
131 val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
132 "blueprint-runtime-$storeId"
134 assertNotNull(store, "failed to get store")
136 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
139 val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
140 "blueprint-runtime-$storeId"
144 log.trace("Received map event : $it")
151 private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
152 val lockName = "sample-lock"
153 withContext(Dispatchers.IO) {
154 val deferred = async {
155 newSingleThreadContext("first").use {
157 executeLock(bluePrintClusterServices[0], "first", lockName)
161 val deferred2 = async {
162 newSingleThreadContext("second").use {
164 executeLock(bluePrintClusterServices[1], "second", lockName)
168 val deferred3 = async {
169 newSingleThreadContext("third").use {
171 executeLock(bluePrintClusterServices[2], "third", lockName)
181 private suspend fun executeLock(
182 bluePrintClusterService: BluePrintClusterService,
186 log.info("initialising $lockId lock...")
187 val distributedLock = bluePrintClusterService.clusterLock(lockName)
188 assertNotNull(distributedLock, "failed to create distributed $lockId lock")
189 distributedLock.lock()
190 assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
192 log.info("locked $lockId process for 5mSec")
195 distributedLock.unLock()
196 log.info("$lockId lock released")
198 distributedLock.close()
201 private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
202 log.info("initialising ...")
203 val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
205 val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
206 assertEquals(3, memberNameMap.size, "failed to match member size")
207 memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
208 val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
209 // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
210 // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
211 // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
212 // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
215 private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
216 bluePrintClusterServices.forEach { bluePrintClusterService ->
217 val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
218 val hazelcast = hazelcastClusterService.hazelcast
219 val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
220 val master = hazelcastClusterService.masterMember("system").memberAddress
221 val members = hazelcastClusterService.allMembers().map { it.memberAddress }
222 log.info("Cluster Members for($self): master($master) Members($members)")
225 val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
226 assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
227 log.info("Cluster applicationMembers ($applicationMembers)")
231 open class SampleSchedulerTask : Runnable, Serializable {
232 private val log = logger(SampleSchedulerTask::class)
234 log.info("I am scheduler action")