Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazelcastClusterServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.instance.impl.HazelcastInstanceFactory
24 import com.hazelcast.map.IMap
25 import io.mockk.mockk
26 import kotlinx.coroutines.Dispatchers
27 import kotlinx.coroutines.async
28 import kotlinx.coroutines.awaitAll
29 import kotlinx.coroutines.delay
30 import kotlinx.coroutines.newSingleThreadContext
31 import kotlinx.coroutines.runBlocking
32 import kotlinx.coroutines.withContext
33 import org.junit.After
34 import org.junit.Before
35 import org.junit.Test
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
38 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
39 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
40 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
41 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
42 import org.onap.ccsdk.cds.controllerblueprints.core.logger
43 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
44 import java.io.Serializable
45 import java.util.Properties
46 import kotlin.test.assertEquals
47 import kotlin.test.assertFalse
48 import kotlin.test.assertNotNull
49 import kotlin.test.assertTrue
50
51 class HazelcastClusterServiceTest {
52
53     private val log = logger(HazelcastClusterServiceTest::class)
54     private val clusterSize = 3
55
56     @Before
57     @After
58     fun killAllHazelcastInstances() {
59         HazelcastInstanceFactory.terminateAll()
60     }
61
62     @Test
63     fun testClientFileSystemYamlConfig() {
64         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
65         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
66         System.setProperty(
67             "hazelcast.client.config",
68             normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
69         )
70         val config = YamlClientConfigBuilder().build()
71         assertNotNull(config)
72         assertEquals("test-cluster", config.clusterName)
73         assertEquals("node-1234", config.instanceName)
74     }
75
76     @Test
77     fun testServerFileSystemYamlConfig() {
78         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
79         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
80         val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
81         val config = FileSystemYamlConfig(configFile)
82         assertNotNull(config)
83         assertEquals("test-cluster", config.clusterName)
84         assertEquals("node-1234", config.instanceName)
85     }
86
87     @Test
88     fun testClusterJoin() {
89         runBlocking {
90             val bluePrintClusterServiceOne =
91                 createCluster(arrayListOf(1, 2, 3)).toMutableList()
92             printReachableMembers(bluePrintClusterServiceOne)
93             testDistributedStore(bluePrintClusterServiceOne)
94             testDistributedLock(bluePrintClusterServiceOne)
95         }
96     }
97
98     @Test
99     fun testClusterMessaging() {
100         runBlocking {
101             val bluePrintClusterServiceOne =
102                 createCluster(arrayListOf(1, 2, 3)).toMutableList()
103             printReachableMembers(bluePrintClusterServiceOne)
104             testMessageReceived(bluePrintClusterServiceOne)
105         }
106     }
107
108     private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) {
109         val sender = bluePrintClusterServices[0] as HazelcastClusterService
110         val receiver = bluePrintClusterServices[1] as HazelcastClusterService
111         val messageSent = "hello world"
112         var isMessageReceived = false
113         val uuid = receiver.addBlueprintClusterMessageListener(
114             BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE,
115             object : BlueprintClusterMessageListener<String> {
116                 override fun onMessage(message: BluePrintClusterMessage<String>?) {
117                     log.info("Message received - ${message?.payload}")
118                     isMessageReceived = messageSent == message?.payload
119                 }
120             }
121         )
122
123         assertNotNull(uuid)
124         sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent)
125         delay(1000)
126         assertTrue(isMessageReceived)
127
128         assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
129         assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
130     }
131
132     private suspend fun createCluster(
133         ids: List<Int>,
134         joinAsClient: Boolean? = false
135     ): List<BluePrintClusterService> {
136
137         return withContext(Dispatchers.Default) {
138             val deferred = ids.map { id ->
139                 async(Dispatchers.IO) {
140                     val nodeId = "node-$id"
141                     log.info("********** Starting ($nodeId)")
142                     val properties = Properties()
143                     properties["hazelcast.logging.type"] = "slf4j"
144                     val clusterInfo =
145                         if (joinAsClient!!) {
146                             ClusterInfo(
147                                 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
148                                 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
149                                 properties = properties
150                             )
151                         } else {
152                             ClusterInfo(
153                                 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
154                                 configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml",
155                                 properties = properties
156                             )
157                         }
158                     val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true))
159                     hazelcastClusterService.startCluster(clusterInfo)
160                     hazelcastClusterService
161                 }
162             }
163             deferred.awaitAll()
164         }
165     }
166
167     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
168         /** Test Distributed store creation */
169         repeat(2) { storeId ->
170             val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
171                 "blueprint-runtime-$storeId"
172             ) as IMap
173             assertNotNull(store, "failed to get store")
174             repeat(5) {
175                 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
176             }
177
178             val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
179                 "blueprint-runtime-$storeId"
180             ) as IMap
181
182             store1.values.map {
183                 log.trace("Received map event : $it")
184             }
185             delay(5)
186             store.clear()
187         }
188     }
189
190     private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
191         val lockName = "sample-lock"
192         withContext(Dispatchers.IO) {
193             val deferred = async {
194                 newSingleThreadContext("first").use {
195                     withContext(it) {
196                         executeLock(bluePrintClusterServices[0], "first", lockName)
197                     }
198                 }
199             }
200             val deferred2 = async {
201                 newSingleThreadContext("second").use {
202                     withContext(it) {
203                         executeLock(bluePrintClusterServices[1], "second", lockName)
204                     }
205                 }
206             }
207             val deferred3 = async {
208                 newSingleThreadContext("third").use {
209                     withContext(it) {
210                         executeLock(bluePrintClusterServices[2], "third", lockName)
211                     }
212                 }
213             }
214             deferred.start()
215             deferred2.start()
216             deferred3.start()
217         }
218     }
219
220     private suspend fun executeLock(
221         bluePrintClusterService: BluePrintClusterService,
222         lockId: String,
223         lockName: String
224     ) {
225         log.info("initialising $lockId lock...")
226         val distributedLock = bluePrintClusterService.clusterLock(lockName)
227         assertNotNull(distributedLock, "failed to create distributed $lockId lock")
228         distributedLock.lock()
229         assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
230         try {
231             log.info("locked $lockId process for 5mSec")
232             delay(5)
233         } finally {
234             distributedLock.unLock()
235             log.info("$lockId lock released")
236         }
237         distributedLock.close()
238     }
239
240     private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
241         log.info("initialising ...")
242         val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
243
244         val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
245         assertEquals(3, memberNameMap.size, "failed to match member size")
246         memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
247         val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
248         // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
249         // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
250         // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
251         // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
252     }
253
254     private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
255         bluePrintClusterServices.forEach { bluePrintClusterService ->
256             val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
257             val hazelcast = hazelcastClusterService.hazelcast
258             val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
259             val master = hazelcastClusterService.masterMember("system").memberAddress
260             val members = hazelcastClusterService.allMembers().map { it.memberAddress }
261             log.info("Cluster Members for($self): master($master) Members($members)")
262         }
263
264         val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
265         assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
266         log.info("Cluster applicationMembers ($applicationMembers)")
267     }
268 }
269
270 open class SampleSchedulerTask : Runnable, Serializable {
271
272     private val log = logger(SampleSchedulerTask::class)
273     override fun run() {
274         log.info("I am scheduler action")
275     }
276 }