6be3334bb9ff3ed48ffcf0c3a44cdb4479eba39a
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazlecastClusterService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import kotlinx.coroutines.delay
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
40 import org.onap.ccsdk.cds.controllerblueprints.core.logger
41 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
43 import org.springframework.stereotype.Service
44 import java.time.Duration
45 import java.util.concurrent.TimeUnit
46
47 @Service
48 open class HazlecastClusterService : BluePrintClusterService {
49
50     private val log = logger(HazlecastClusterService::class)
51     lateinit var hazelcast: HazelcastInstance
52     lateinit var cpSubsystemManagementService: CPSubsystemManagementService
53     var joinedClient = false
54     var joinedLite = false
55
56     override suspend fun <T> startCluster(configuration: T) {
57         /** Get the Hazelcast Client or Server instance */
58         hazelcast =
59             when (configuration) {
60                 is Config -> {
61                     joinedLite = configuration.isLiteMember
62                     val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
63                     /** Promote as CP Member */
64                     promoteAsCPMember(hazelcastInstance)
65                     hazelcastInstance
66                 }
67                 is ClientConfig -> {
68                     joinedClient = true
69                     HazelcastClient.newHazelcastClient(configuration)
70                 }
71                 is ClusterInfo -> {
72
73                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
74                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
75
76                     val memberAttributeConfig = MemberAttributeConfig()
77                     memberAttributeConfig.setAttribute(
78                         BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
79                         configuration.nodeId
80                     )
81
82                     val configFile = configuration.configFile
83                     /** Check file exists */
84                     val clusterConfigFile = normalizedFile(configuration.configFile)
85                     check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
86                         "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
87                     }
88                     check(clusterConfigFile.exists()) {
89                         "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
90                     }
91                     log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
92
93                     /** Hazelcast Client from config file */
94                     if (configuration.joinAsClient) {
95                         /** Set the configuration file to system properties, so that Hazelcast will read automatically */
96                         System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
97                         joinedClient = true
98                         val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
99                         hazelcastClientConfiguration.properties = configuration.properties
100                         HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
101                     } else {
102                         /** Hazelcast Server from config file */
103                         val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
104                         hazelcastServerConfiguration.clusterName = configuration.id
105                         hazelcastServerConfiguration.instanceName = configuration.nodeId
106                         hazelcastServerConfiguration.properties = configuration.properties
107                         hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
108                         joinedLite = hazelcastServerConfiguration.isLiteMember
109                         val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
110                         /** Promote as CP Member */
111                         promoteAsCPMember(hazelcastInstance)
112                         hazelcastInstance
113                     }
114                 }
115                 else -> {
116                     throw BluePrintProcessorException("couldn't understand the cluster configuration")
117                 }
118             }
119
120         /** Add the Membership Listeners */
121         hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
122         log.info(
123             "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
124         )
125     }
126
127     override fun isClient(): Boolean {
128         return joinedClient
129     }
130
131     override fun isLiteMember(): Boolean {
132         return joinedLite
133     }
134
135     override fun clusterJoined(): Boolean {
136         return hazelcast.lifecycleService.isRunning
137     }
138
139     override suspend fun masterMember(partitionGroup: String): ClusterMember {
140         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
141         return hazelcast.cluster.members.first().toClusterMember()
142     }
143
144     override suspend fun allMembers(): Set<ClusterMember> {
145         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
146         return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
147     }
148
149     override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
150         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
151         return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
152     }
153
154     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
155         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
156         return hazelcast.getMap<String, T>(name)
157     }
158
159     /**
160      * The DistributedLock is a distributed implementation of Java’s Lock.
161      * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
162      * determine ordering of multiple concurrent lock holders.
163      * DistributedLocks are designed to account for failures within the cluster.
164      * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
165      * the lock will be released and granted to the next waiting process.
166      */
167     override suspend fun clusterLock(name: String): ClusterLock {
168         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
169         return ClusterLockImpl(hazelcast, name)
170     }
171
172     /** Return interface may change and it will be included in BluePrintClusterService */
173     @UseExperimental
174     suspend fun clusterScheduler(name: String): IScheduledExecutorService {
175         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
176         return hazelcast.getScheduledExecutorService(name)
177     }
178
179     override suspend fun shutDown(duration: Duration) {
180         if (::hazelcast.isInitialized && clusterJoined()) {
181             delay(duration.toMillis())
182             HazlecastClusterUtils.terminate(hazelcast)
183         }
184     }
185
186     /** Utils */
187     suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
188         if (!joinedClient && !joinedLite) {
189             HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance)
190         }
191     }
192
193     suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
194         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
195         check(!isClient()) { "not supported for cluster client members." }
196         return hazelcastApplicationMembers(ClusterUtils.applicationName())
197     }
198
199     suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
200         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
201         val applicationMembers: MutableMap<String, Member> = hashMapOf()
202         hazelcast.cluster.members.map { member ->
203             val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
204             if (memberName.startsWith(appName, true)) {
205                 applicationMembers[memberName] = member
206             }
207         }
208         return applicationMembers
209     }
210 }
211
212 open class BlueprintsClusterMembershipListener() :
213     MembershipListener {
214     private val log = logger(BlueprintsClusterMembershipListener::class)
215
216     override fun memberRemoved(membershipEvent: MembershipEvent) {
217         log.info("MembershipEvent: $membershipEvent")
218     }
219
220     override fun memberAdded(membershipEvent: MembershipEvent) {
221         log.info("MembershipEvent: $membershipEvent")
222     }
223 }
224
225 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
226     private val log = logger(ClusterLockImpl::class)
227
228     lateinit var distributedLock: FencedLock
229
230     override fun name(): String {
231         return distributedLock.name
232     }
233
234     override suspend fun lock() {
235         distributedLock = hazelcast.cpSubsystem.getLock(name)
236         distributedLock.lock()
237         log.trace("Cluster lock($name) created..")
238     }
239
240     override suspend fun tryLock(timeout: Long): Boolean {
241         distributedLock = hazelcast.cpSubsystem.getLock(name)
242         return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
243     }
244
245     override suspend fun unLock() {
246         distributedLock.unlock()
247         log.trace("Cluster unlock(${name()}) successfully..")
248     }
249
250     override fun isLocked(): Boolean {
251         return distributedLock.isLocked
252     }
253
254     override suspend fun fenceLock(): String {
255         distributedLock = hazelcast.cpSubsystem.getLock(name)
256         val fence = distributedLock.lockAndGetFence()
257         log.trace("Cluster lock($name) fence($fence) created..")
258         return fence.toString()
259     }
260
261     override suspend fun tryFenceLock(timeout: Long): String {
262         distributedLock = hazelcast.cpSubsystem.getLock(name)
263         return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
264     }
265
266     override fun close() {
267     }
268 }