2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.instance.impl.HazelcastInstanceFactory
24 import com.hazelcast.map.IMap
26 import kotlinx.coroutines.Dispatchers
27 import kotlinx.coroutines.async
28 import kotlinx.coroutines.awaitAll
29 import kotlinx.coroutines.delay
30 import kotlinx.coroutines.newSingleThreadContext
31 import kotlinx.coroutines.runBlocking
32 import kotlinx.coroutines.withContext
33 import org.junit.After
34 import org.junit.Before
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
38 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
39 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
40 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
41 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
42 import org.onap.ccsdk.cds.controllerblueprints.core.logger
43 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
44 import java.io.Serializable
45 import java.util.Properties
46 import kotlin.test.assertEquals
47 import kotlin.test.assertFalse
48 import kotlin.test.assertNotNull
49 import kotlin.test.assertTrue
51 class HazelcastClusterServiceTest {
53 private val log = logger(HazelcastClusterServiceTest::class)
54 private val clusterSize = 3
58 fun killAllHazelcastInstances() {
59 HazelcastInstanceFactory.terminateAll()
63 fun testClientFileSystemYamlConfig() {
64 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
65 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
67 "hazelcast.client.config",
68 normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
70 val config = YamlClientConfigBuilder().build()
72 assertEquals("test-cluster", config.clusterName)
73 assertEquals("node-1234", config.instanceName)
77 fun testServerFileSystemYamlConfig() {
78 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
79 System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
80 val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
81 val config = FileSystemYamlConfig(configFile)
83 assertEquals("test-cluster", config.clusterName)
84 assertEquals("node-1234", config.instanceName)
88 fun testClusterJoin() {
90 val bluePrintClusterServiceOne =
91 createCluster(arrayListOf(1, 2, 3)).toMutableList()
92 printReachableMembers(bluePrintClusterServiceOne)
93 testDistributedStore(bluePrintClusterServiceOne)
94 testDistributedLock(bluePrintClusterServiceOne)
99 fun testClusterMessaging() {
101 val bluePrintClusterServiceOne =
102 createCluster(arrayListOf(1, 2, 3)).toMutableList()
103 printReachableMembers(bluePrintClusterServiceOne)
104 testMessageReceived(bluePrintClusterServiceOne)
108 private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) {
109 val sender = bluePrintClusterServices[0] as HazelcastClusterService
110 val receiver = bluePrintClusterServices[1] as HazelcastClusterService
111 val messageSent = "hello world"
112 var isMessageReceived = false
113 val uuid = receiver.addBlueprintClusterMessageListener(
114 BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE,
115 object : BlueprintClusterMessageListener<String> {
116 override fun onMessage(message: BluePrintClusterMessage<String>?) {
117 log.info("Message received - ${message?.payload}")
118 isMessageReceived = messageSent == message?.payload
124 sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent)
126 assertTrue(isMessageReceived)
128 assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
129 assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
132 private suspend fun createCluster(
134 joinAsClient: Boolean? = false
135 ): List<BluePrintClusterService> {
137 return withContext(Dispatchers.Default) {
138 val deferred = ids.map { id ->
139 async(Dispatchers.IO) {
140 val nodeId = "node-$id"
141 log.info("********** Starting ($nodeId)")
142 val properties = Properties()
143 properties["hazelcast.logging.type"] = "slf4j"
145 if (joinAsClient!!) {
147 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
148 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
149 properties = properties
153 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
154 configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml",
155 properties = properties
158 val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true))
159 hazelcastClusterService.startCluster(clusterInfo)
160 hazelcastClusterService
167 private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
168 /** Test Distributed store creation */
169 repeat(2) { storeId ->
170 val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
171 "blueprint-runtime-$storeId"
173 assertNotNull(store, "failed to get store")
175 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
178 val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
179 "blueprint-runtime-$storeId"
183 log.trace("Received map event : $it")
190 private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
191 val lockName = "sample-lock"
192 withContext(Dispatchers.IO) {
193 val deferred = async {
194 newSingleThreadContext("first").use {
196 executeLock(bluePrintClusterServices[0], "first", lockName)
200 val deferred2 = async {
201 newSingleThreadContext("second").use {
203 executeLock(bluePrintClusterServices[1], "second", lockName)
207 val deferred3 = async {
208 newSingleThreadContext("third").use {
210 executeLock(bluePrintClusterServices[2], "third", lockName)
220 private suspend fun executeLock(
221 bluePrintClusterService: BluePrintClusterService,
225 log.info("initialising $lockId lock...")
226 val distributedLock = bluePrintClusterService.clusterLock(lockName)
227 assertNotNull(distributedLock, "failed to create distributed $lockId lock")
228 distributedLock.lock()
229 assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
231 log.info("locked $lockId process for 5mSec")
234 distributedLock.unLock()
235 log.info("$lockId lock released")
237 distributedLock.close()
240 private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
241 log.info("initialising ...")
242 val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
244 val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
245 assertEquals(3, memberNameMap.size, "failed to match member size")
246 memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
247 val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
248 // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
249 // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
250 // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
251 // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
254 private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
255 bluePrintClusterServices.forEach { bluePrintClusterService ->
256 val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
257 val hazelcast = hazelcastClusterService.hazelcast
258 val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
259 val master = hazelcastClusterService.masterMember("system").memberAddress
260 val members = hazelcastClusterService.allMembers().map { it.memberAddress }
261 log.info("Cluster Members for($self): master($master) Members($members)")
264 val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
265 assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
266 log.info("Cluster applicationMembers ($applicationMembers)")
270 open class SampleSchedulerTask : Runnable, Serializable {
272 private val log = logger(SampleSchedulerTask::class)
274 log.info("I am scheduler action")