Cluster co-ordination with Hazelcast.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazlecastClusterServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.map.IMap
24 import kotlinx.coroutines.Dispatchers
25 import kotlinx.coroutines.async
26 import kotlinx.coroutines.awaitAll
27 import kotlinx.coroutines.delay
28 import kotlinx.coroutines.runBlocking
29 import kotlinx.coroutines.withContext
30 import org.junit.Test
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
32 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
33 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
34 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
35 import org.onap.ccsdk.cds.controllerblueprints.core.logger
36 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
37 import java.io.Serializable
38 import java.time.Duration
39 import java.util.Properties
40 import kotlin.test.assertEquals
41 import kotlin.test.assertNotNull
42 import kotlin.test.assertTrue
43
44 class HazlecastClusterServiceTest {
45     private val log = logger(HazlecastClusterServiceTest::class)
46     private val clusterSize = 3
47
48     @Test
49     fun testClientFileSystemYamlConfig() {
50         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
51         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
52         System.setProperty(
53             "hazelcast.client.config",
54             normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
55         )
56         val config = YamlClientConfigBuilder().build()
57         assertNotNull(config)
58         assertEquals("test-cluster", config.clusterName)
59         assertEquals("node-1234", config.instanceName)
60     }
61
62     @Test
63     fun testServerFileSystemYamlConfig() {
64         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
65         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
66         val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
67         val config = FileSystemYamlConfig(configFile)
68         assertNotNull(config)
69         assertEquals("test-cluster", config.clusterName)
70         assertEquals("node-1234", config.instanceName)
71     }
72
73     @Test
74     fun testClusterJoin() {
75         runBlocking {
76             val bluePrintClusterServiceOne =
77                 createCluster(arrayListOf(5679, 5680, 5681)).toMutableList()
78             // delay(1000)
79             // Join as Hazlecast Management Node
80             // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true)
81             // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false)
82             // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
83             printReachableMembers(bluePrintClusterServiceOne)
84             testDistributedStore(bluePrintClusterServiceOne)
85             testDistributedLock(bluePrintClusterServiceOne)
86
87             // executeScheduler(bluePrintClusterServiceOne[0])
88             // delay(1000)
89             // Shutdown
90             shutdown(bluePrintClusterServiceOne)
91         }
92     }
93
94     private suspend fun createCluster(
95         ports: List<Int>,
96         joinAsClient: Boolean? = false
97     ): List<BluePrintClusterService> {
98
99         return withContext(Dispatchers.Default) {
100             val deferred = ports.map { port ->
101                 async(Dispatchers.IO) {
102                     val nodeId = "node-$port"
103                     log.info("********** Starting node($nodeId) on port($port)")
104                     val properties = Properties()
105                     properties["hazelcast.logging.type"] = "slf4j"
106                     val clusterInfo =
107                         if (joinAsClient!!) {
108                             ClusterInfo(
109                                 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
110                                 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
111                                 properties = properties
112                             )
113                         } else {
114                             ClusterInfo(
115                                 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
116                                 configFile = "./src/test/resources/hazelcast/hazelcast-$port.yaml",
117                                 properties = properties
118                             )
119                         }
120                     val hazlecastClusterService = HazlecastClusterService()
121                     hazlecastClusterService.startCluster(clusterInfo)
122                     hazlecastClusterService
123                 }
124             }
125             deferred.awaitAll()
126         }
127     }
128
129     private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) {
130         bluePrintClusterServices.forEach { bluePrintClusterService ->
131             bluePrintClusterService.shutDown(Duration.ofMillis(10))
132         }
133     }
134
135     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
136         /** Test Distributed store creation */
137         repeat(2) { storeId ->
138             val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
139                 "blueprint-runtime-$storeId"
140             ) as IMap
141             assertNotNull(store, "failed to get store")
142             repeat(5) {
143                 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
144             }
145
146             val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
147                 "blueprint-runtime-$storeId"
148             ) as IMap
149
150             store1.values.map {
151                 log.trace("Received map event : $it")
152             }
153             delay(5)
154             store.clear()
155         }
156     }
157
158     private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
159         val lockName = "sample-lock"
160         withContext(Dispatchers.IO) {
161             val deferred = async {
162                 executeLock(bluePrintClusterServices[0], "first", lockName)
163             }
164             val deferred2 = async {
165                 executeLock(bluePrintClusterServices[0], "second", lockName)
166             }
167             val deferred3 = async {
168                 executeLock(bluePrintClusterServices[2], "third", lockName)
169             }
170             deferred.start()
171             deferred2.start()
172             deferred3.start()
173         }
174     }
175
176     private suspend fun executeLock(
177         bluePrintClusterService: BluePrintClusterService,
178         lockId: String,
179         lockName: String
180     ) {
181         log.info("initialising $lockId lock...")
182         val distributedLock = bluePrintClusterService.clusterLock(lockName)
183         assertNotNull(distributedLock, "failed to create distributed $lockId lock")
184         distributedLock.lock()
185         assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
186         try {
187             log.info("locked $lockId process for 5mSec")
188             delay(5)
189         } finally {
190             distributedLock.unLock()
191             log.info("$lockId lock released")
192         }
193         distributedLock.close()
194     }
195
196     private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
197         log.info("initialising ...")
198         val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
199
200         val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
201         assertEquals(3, memberNameMap.size, "failed to match member size")
202         memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
203         val scheduler = hazlecastClusterService.clusterScheduler("cleanup")
204         // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
205         // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
206         // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
207         // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
208     }
209
210     private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
211         bluePrintClusterServices.forEach { bluePrintClusterService ->
212             val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
213             val hazelcast = hazlecastClusterService.hazelcast
214             val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
215             val master = hazlecastClusterService.masterMember("system").memberAddress
216             val members = hazlecastClusterService.allMembers().map { it.memberAddress }
217             log.info("Cluster Members for($self): master($master) Members($members)")
218         }
219
220         val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56")
221         assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
222         log.info("Cluster applicationMembers ($applicationMembers)")
223     }
224 }
225
226 open class SampleSchedulerTask : Runnable, Serializable {
227     private val log = logger(SampleSchedulerTask::class)
228     override fun run() {
229         log.info("I am scheduler action")
230     }
231 }