da5564235dc8033dad322e628f9f62c6c1d7225d
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazelcastClusterServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.instance.impl.HazelcastInstanceFactory
24 import com.hazelcast.map.IMap
25 import kotlinx.coroutines.Dispatchers
26 import kotlinx.coroutines.async
27 import kotlinx.coroutines.awaitAll
28 import kotlinx.coroutines.delay
29 import kotlinx.coroutines.newSingleThreadContext
30 import kotlinx.coroutines.runBlocking
31 import kotlinx.coroutines.withContext
32 import org.junit.After
33 import org.junit.Before
34 import org.junit.Test
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
37 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
38 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
39 import org.onap.ccsdk.cds.controllerblueprints.core.logger
40 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
41 import java.io.Serializable
42 import java.util.Properties
43 import kotlin.test.assertEquals
44 import kotlin.test.assertNotNull
45 import kotlin.test.assertTrue
46
47 class HazelcastClusterServiceTest {
48
49     private val log = logger(HazelcastClusterServiceTest::class)
50     private val clusterSize = 3
51
52     @Before
53     @After
54     fun killAllHazelcastInstances() {
55         HazelcastInstanceFactory.terminateAll()
56     }
57
58     @Test
59     fun testClientFileSystemYamlConfig() {
60         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
61         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
62         System.setProperty(
63             "hazelcast.client.config",
64             normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
65         )
66         val config = YamlClientConfigBuilder().build()
67         assertNotNull(config)
68         assertEquals("test-cluster", config.clusterName)
69         assertEquals("node-1234", config.instanceName)
70     }
71
72     @Test
73     fun testServerFileSystemYamlConfig() {
74         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
75         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
76         val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
77         val config = FileSystemYamlConfig(configFile)
78         assertNotNull(config)
79         assertEquals("test-cluster", config.clusterName)
80         assertEquals("node-1234", config.instanceName)
81     }
82
83     @Test
84     fun testClusterJoin() {
85         runBlocking {
86             val bluePrintClusterServiceOne =
87                 createCluster(arrayListOf(1, 2, 3)).toMutableList()
88             printReachableMembers(bluePrintClusterServiceOne)
89             testDistributedStore(bluePrintClusterServiceOne)
90             testDistributedLock(bluePrintClusterServiceOne)
91         }
92     }
93
94     private suspend fun createCluster(
95         ids: List<Int>,
96         joinAsClient: Boolean? = false
97     ): List<BluePrintClusterService> {
98
99         return withContext(Dispatchers.Default) {
100             val deferred = ids.map { id ->
101                 async(Dispatchers.IO) {
102                     val nodeId = "node-$id"
103                     log.info("********** Starting ($nodeId)")
104                     val properties = Properties()
105                     properties["hazelcast.logging.type"] = "slf4j"
106                     val clusterInfo =
107                         if (joinAsClient!!) {
108                             ClusterInfo(
109                                 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
110                                 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
111                                 properties = properties
112                             )
113                         } else {
114                             ClusterInfo(
115                                 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
116                                 configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml",
117                                 properties = properties
118                             )
119                         }
120                     val hazelcastClusterService = HazelcastClusterService()
121                     hazelcastClusterService.startCluster(clusterInfo)
122                     hazelcastClusterService
123                 }
124             }
125             deferred.awaitAll()
126         }
127     }
128
129     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
130         /** Test Distributed store creation */
131         repeat(2) { storeId ->
132             val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
133                 "blueprint-runtime-$storeId"
134             ) as IMap
135             assertNotNull(store, "failed to get store")
136             repeat(5) {
137                 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
138             }
139
140             val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
141                 "blueprint-runtime-$storeId"
142             ) as IMap
143
144             store1.values.map {
145                 log.trace("Received map event : $it")
146             }
147             delay(5)
148             store.clear()
149         }
150     }
151
152     private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
153         val lockName = "sample-lock"
154         withContext(Dispatchers.IO) {
155             val deferred = async {
156                 newSingleThreadContext("first").use {
157                     withContext(it) {
158                         executeLock(bluePrintClusterServices[0], "first", lockName)
159                     }
160                 }
161             }
162             val deferred2 = async {
163                 newSingleThreadContext("second").use {
164                     withContext(it) {
165                         executeLock(bluePrintClusterServices[1], "second", lockName)
166                     }
167                 }
168             }
169             val deferred3 = async {
170                 newSingleThreadContext("third").use {
171                     withContext(it) {
172                         executeLock(bluePrintClusterServices[2], "third", lockName)
173                     }
174                 }
175             }
176             deferred.start()
177             deferred2.start()
178             deferred3.start()
179         }
180     }
181
182     private suspend fun executeLock(
183         bluePrintClusterService: BluePrintClusterService,
184         lockId: String,
185         lockName: String
186     ) {
187         log.info("initialising $lockId lock...")
188         val distributedLock = bluePrintClusterService.clusterLock(lockName)
189         assertNotNull(distributedLock, "failed to create distributed $lockId lock")
190         distributedLock.lock()
191         assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
192         try {
193             log.info("locked $lockId process for 5mSec")
194             delay(5)
195         } finally {
196             distributedLock.unLock()
197             log.info("$lockId lock released")
198         }
199         distributedLock.close()
200     }
201
202     private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
203         log.info("initialising ...")
204         val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
205
206         val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
207         assertEquals(3, memberNameMap.size, "failed to match member size")
208         memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
209         val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
210         // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
211         // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
212         // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
213         // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
214     }
215
216     private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
217         bluePrintClusterServices.forEach { bluePrintClusterService ->
218             val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
219             val hazelcast = hazelcastClusterService.hazelcast
220             val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
221             val master = hazelcastClusterService.masterMember("system").memberAddress
222             val members = hazelcastClusterService.allMembers().map { it.memberAddress }
223             log.info("Cluster Members for($self): master($master) Members($members)")
224         }
225
226         val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
227         assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
228         log.info("Cluster applicationMembers ($applicationMembers)")
229     }
230 }
231
232 open class SampleSchedulerTask : Runnable, Serializable {
233
234     private val log = logger(SampleSchedulerTask::class)
235     override fun run() {
236         log.info("I am scheduler action")
237     }
238 }