2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.instance.impl.HazelcastInstanceFactory
24 import com.hazelcast.map.IMap
25 import kotlinx.coroutines.Dispatchers
26 import kotlinx.coroutines.async
27 import kotlinx.coroutines.awaitAll
28 import kotlinx.coroutines.delay
29 import kotlinx.coroutines.newSingleThreadContext
30 import kotlinx.coroutines.runBlocking
31 import kotlinx.coroutines.withContext
32 import org.junit.After
33 import org.junit.Before
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
37 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
38 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
39 import org.onap.ccsdk.cds.controllerblueprints.core.logger
40 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
41 import java.io.Serializable
42 import java.util.Properties
43 import kotlin.test.assertEquals
44 import kotlin.test.assertNotNull
45 import kotlin.test.assertTrue
47 class HazelcastClusterServiceTest {
49 private val log = logger(HazelcastClusterServiceTest::class)
50 private val clusterSize = 3
54 fun killAllHazelcastInstances() {
55 HazelcastInstanceFactory.terminateAll()
59 fun testClientFileSystemYamlConfig() {
60 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
61 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
63 "hazelcast.client.config",
64 normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
66 val config = YamlClientConfigBuilder().build()
68 assertEquals("test-cluster", config.clusterName)
69 assertEquals("node-1234", config.instanceName)
73 fun testServerFileSystemYamlConfig() {
74 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
75 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
76 val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
77 val config = FileSystemYamlConfig(configFile)
79 assertEquals("test-cluster", config.clusterName)
80 assertEquals("node-1234", config.instanceName)
84 fun testClusterJoin() {
86 val bluePrintClusterServiceOne =
87 createCluster(arrayListOf(1, 2, 3)).toMutableList()
88 printReachableMembers(bluePrintClusterServiceOne)
89 testDistributedStore(bluePrintClusterServiceOne)
90 testDistributedLock(bluePrintClusterServiceOne)
94 private suspend fun createCluster(
96 joinAsClient: Boolean? = false
97 ): List<BluePrintClusterService> {
99 return withContext(Dispatchers.Default) {
100 val deferred = ids.map { id ->
101 async(Dispatchers.IO) {
102 val nodeId = "node-$id"
103 log.info("********** Starting ($nodeId)")
104 val properties = Properties()
105 properties["hazelcast.logging.type"] = "slf4j"
107 if (joinAsClient!!) {
109 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
110 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
111 properties = properties
115 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
116 configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml",
117 properties = properties
120 val hazelcastClusterService = HazelcastClusterService()
121 hazelcastClusterService.startCluster(clusterInfo)
122 hazelcastClusterService
129 private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
130 /** Test Distributed store creation */
131 repeat(2) { storeId ->
132 val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
133 "blueprint-runtime-$storeId"
135 assertNotNull(store, "failed to get store")
137 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
140 val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
141 "blueprint-runtime-$storeId"
145 log.trace("Received map event : $it")
152 private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
153 val lockName = "sample-lock"
154 withContext(Dispatchers.IO) {
155 val deferred = async {
156 newSingleThreadContext("first").use {
158 executeLock(bluePrintClusterServices[0], "first", lockName)
162 val deferred2 = async {
163 newSingleThreadContext("second").use {
165 executeLock(bluePrintClusterServices[1], "second", lockName)
169 val deferred3 = async {
170 newSingleThreadContext("third").use {
172 executeLock(bluePrintClusterServices[2], "third", lockName)
182 private suspend fun executeLock(
183 bluePrintClusterService: BluePrintClusterService,
187 log.info("initialising $lockId lock...")
188 val distributedLock = bluePrintClusterService.clusterLock(lockName)
189 assertNotNull(distributedLock, "failed to create distributed $lockId lock")
190 distributedLock.lock()
191 assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
193 log.info("locked $lockId process for 5mSec")
196 distributedLock.unLock()
197 log.info("$lockId lock released")
199 distributedLock.close()
202 private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
203 log.info("initialising ...")
204 val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
206 val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
207 assertEquals(3, memberNameMap.size, "failed to match member size")
208 memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
209 val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
210 // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
211 // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
212 // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
213 // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
216 private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
217 bluePrintClusterServices.forEach { bluePrintClusterService ->
218 val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
219 val hazelcast = hazelcastClusterService.hazelcast
220 val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
221 val master = hazelcastClusterService.masterMember("system").memberAddress
222 val members = hazelcastClusterService.allMembers().map { it.memberAddress }
223 log.info("Cluster Members for($self): master($master) Members($members)")
226 val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
227 assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
228 log.info("Cluster applicationMembers ($applicationMembers)")
232 open class SampleSchedulerTask : Runnable, Serializable {
234 private val log = logger(SampleSchedulerTask::class)
236 log.info("I am scheduler action")