Merge "fix issue for deleting files from definition section"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazelcastClusterServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.hazelcast.client.config.YamlClientConfigBuilder
21 import com.hazelcast.cluster.Member
22 import com.hazelcast.config.FileSystemYamlConfig
23 import com.hazelcast.instance.impl.HazelcastInstanceFactory
24 import com.hazelcast.map.IMap
25 import kotlinx.coroutines.Dispatchers
26 import kotlinx.coroutines.async
27 import kotlinx.coroutines.awaitAll
28 import kotlinx.coroutines.delay
29 import kotlinx.coroutines.newSingleThreadContext
30 import kotlinx.coroutines.runBlocking
31 import kotlinx.coroutines.withContext
32 import org.junit.After
33 import org.junit.Before
34 import org.junit.Test
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
37 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
38 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
39 import org.onap.ccsdk.cds.controllerblueprints.core.logger
40 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
41 import java.io.Serializable
42 import java.util.Properties
43 import kotlin.test.assertEquals
44 import kotlin.test.assertNotNull
45 import kotlin.test.assertTrue
46
47 class HazelcastClusterServiceTest {
48     private val log = logger(HazelcastClusterServiceTest::class)
49     private val clusterSize = 3
50
51     @Before
52     @After
53     fun killAllHazelcastInstances() {
54         HazelcastInstanceFactory.terminateAll()
55     }
56
57     @Test
58     fun testClientFileSystemYamlConfig() {
59         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
60         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
61         System.setProperty(
62             "hazelcast.client.config",
63             normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
64         )
65         val config = YamlClientConfigBuilder().build()
66         assertNotNull(config)
67         assertEquals("test-cluster", config.clusterName)
68         assertEquals("node-1234", config.instanceName)
69     }
70
71     @Test
72     fun testServerFileSystemYamlConfig() {
73         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
74         System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
75         val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
76         val config = FileSystemYamlConfig(configFile)
77         assertNotNull(config)
78         assertEquals("test-cluster", config.clusterName)
79         assertEquals("node-1234", config.instanceName)
80     }
81
82     @Test
83     fun testClusterJoin() {
84         runBlocking {
85             val bluePrintClusterServiceOne =
86                 createCluster(arrayListOf(1, 2, 3)).toMutableList()
87             printReachableMembers(bluePrintClusterServiceOne)
88             testDistributedStore(bluePrintClusterServiceOne)
89             testDistributedLock(bluePrintClusterServiceOne)
90         }
91     }
92
93     private suspend fun createCluster(
94         ids: List<Int>,
95         joinAsClient: Boolean? = false
96     ): List<BluePrintClusterService> {
97
98         return withContext(Dispatchers.Default) {
99             val deferred = ids.map { id ->
100                 async(Dispatchers.IO) {
101                     val nodeId = "node-$id"
102                     log.info("********** Starting ($nodeId)")
103                     val properties = Properties()
104                     properties["hazelcast.logging.type"] = "slf4j"
105                     val clusterInfo =
106                         if (joinAsClient!!) {
107                             ClusterInfo(
108                                 id = "test-cluster", nodeId = nodeId, joinAsClient = true,
109                                 configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
110                                 properties = properties
111                             )
112                         } else {
113                             ClusterInfo(
114                                 id = "test-cluster", nodeId = nodeId, joinAsClient = false,
115                                 configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml",
116                                 properties = properties
117                             )
118                         }
119                     val hazelcastClusterService = HazelcastClusterService()
120                     hazelcastClusterService.startCluster(clusterInfo)
121                     hazelcastClusterService
122                 }
123             }
124             deferred.awaitAll()
125         }
126     }
127
128     private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
129         /** Test Distributed store creation */
130         repeat(2) { storeId ->
131             val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
132                 "blueprint-runtime-$storeId"
133             ) as IMap
134             assertNotNull(store, "failed to get store")
135             repeat(5) {
136                 store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
137             }
138
139             val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
140                 "blueprint-runtime-$storeId"
141             ) as IMap
142
143             store1.values.map {
144                 log.trace("Received map event : $it")
145             }
146             delay(5)
147             store.clear()
148         }
149     }
150
151     private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
152         val lockName = "sample-lock"
153         withContext(Dispatchers.IO) {
154             val deferred = async {
155                 newSingleThreadContext("first").use {
156                     withContext(it) {
157                         executeLock(bluePrintClusterServices[0], "first", lockName)
158                     }
159                 }
160             }
161             val deferred2 = async {
162                 newSingleThreadContext("second").use {
163                     withContext(it) {
164                         executeLock(bluePrintClusterServices[1], "second", lockName)
165                     }
166                 }
167             }
168             val deferred3 = async {
169                 newSingleThreadContext("third").use {
170                     withContext(it) {
171                         executeLock(bluePrintClusterServices[2], "third", lockName)
172                     }
173                 }
174             }
175             deferred.start()
176             deferred2.start()
177             deferred3.start()
178         }
179     }
180
181     private suspend fun executeLock(
182         bluePrintClusterService: BluePrintClusterService,
183         lockId: String,
184         lockName: String
185     ) {
186         log.info("initialising $lockId lock...")
187         val distributedLock = bluePrintClusterService.clusterLock(lockName)
188         assertNotNull(distributedLock, "failed to create distributed $lockId lock")
189         distributedLock.lock()
190         assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
191         try {
192             log.info("locked $lockId process for 5mSec")
193             delay(5)
194         } finally {
195             distributedLock.unLock()
196             log.info("$lockId lock released")
197         }
198         distributedLock.close()
199     }
200
201     private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
202         log.info("initialising ...")
203         val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
204
205         val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
206         assertEquals(3, memberNameMap.size, "failed to match member size")
207         memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
208         val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
209         // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
210         // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
211         // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
212         // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
213     }
214
215     private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
216         bluePrintClusterServices.forEach { bluePrintClusterService ->
217             val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
218             val hazelcast = hazelcastClusterService.hazelcast
219             val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
220             val master = hazelcastClusterService.masterMember("system").memberAddress
221             val members = hazelcastClusterService.allMembers().map { it.memberAddress }
222             log.info("Cluster Members for($self): master($master) Members($members)")
223         }
224
225         val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
226         assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
227         log.info("Cluster applicationMembers ($applicationMembers)")
228     }
229 }
230
231 open class SampleSchedulerTask : Runnable, Serializable {
232     private val log = logger(SampleSchedulerTask::class)
233     override fun run() {
234         log.info("I am scheduler action")
235     }
236 }