Enabling Code Formatter
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazelcastClusterService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import kotlinx.coroutines.delay
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
40 import org.onap.ccsdk.cds.controllerblueprints.core.logger
41 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
43 import org.springframework.stereotype.Service
44 import java.time.Duration
45 import java.util.concurrent.TimeUnit
46
47 @Service
48 open class HazelcastClusterService : BluePrintClusterService {
49
50     private val log = logger(HazelcastClusterService::class)
51     lateinit var hazelcast: HazelcastInstance
52     lateinit var cpSubsystemManagementService: CPSubsystemManagementService
53     var joinedClient = false
54     var joinedLite = false
55
56     override suspend fun <T> startCluster(configuration: T) {
57         /** Get the Hazelcast Client or Server instance */
58         hazelcast =
59             when (configuration) {
60                 is Config -> {
61                     joinedLite = configuration.isLiteMember
62                     val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
63                     /** Promote as CP Member */
64                     promoteAsCPMember(hazelcastInstance)
65                     hazelcastInstance
66                 }
67                 is ClientConfig -> {
68                     joinedClient = true
69                     HazelcastClient.newHazelcastClient(configuration)
70                 }
71                 is ClusterInfo -> {
72
73                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
74                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
75
76                     val memberAttributeConfig = MemberAttributeConfig()
77                     memberAttributeConfig.setAttribute(
78                         BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
79                         configuration.nodeId
80                     )
81
82                     val configFile = configuration.configFile
83
84                     /** Check file exists */
85                     val clusterConfigFile = normalizedFile(configuration.configFile)
86                     check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
87                         "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
88                     }
89                     check(clusterConfigFile.exists()) {
90                         "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
91                     }
92                     log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
93
94                     /** Hazelcast Client from config file */
95                     if (configuration.joinAsClient) {
96                         /** Set the configuration file to system properties, so that Hazelcast will read automatically */
97                         System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
98                         joinedClient = true
99                         val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
100                         hazelcastClientConfiguration.properties = configuration.properties
101                         HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
102                     } else {
103                         /** Hazelcast Server from config file */
104                         val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
105                         hazelcastServerConfiguration.clusterName = configuration.id
106                         hazelcastServerConfiguration.instanceName = configuration.nodeId
107                         hazelcastServerConfiguration.properties = configuration.properties
108                         hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
109                         joinedLite = hazelcastServerConfiguration.isLiteMember
110                         val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
111                         /** Promote as CP Member */
112                         promoteAsCPMember(hazelcastInstance)
113                         hazelcastInstance
114                     }
115                 }
116                 else -> {
117                     throw BluePrintProcessorException("couldn't understand the cluster configuration")
118                 }
119             }
120
121         /** Add the Membership Listeners */
122         hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
123         log.info(
124             "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
125         )
126     }
127
128     override fun isClient(): Boolean {
129         return joinedClient
130     }
131
132     override fun isLiteMember(): Boolean {
133         return joinedLite
134     }
135
136     override fun clusterJoined(): Boolean {
137         return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning
138     }
139
140     override suspend fun masterMember(partitionGroup: String): ClusterMember {
141         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
142         return hazelcast.cluster.members.first().toClusterMember()
143     }
144
145     override suspend fun allMembers(): Set<ClusterMember> {
146         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
147         return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
148     }
149
150     override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
151         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
152         return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
153     }
154
155     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
156         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
157         return hazelcast.getMap<String, T>(name)
158     }
159
160     /**
161      * The DistributedLock is a distributed implementation of Java’s Lock.
162      * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
163      * determine ordering of multiple concurrent lock holders.
164      * DistributedLocks are designed to account for failures within the cluster.
165      * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
166      * the lock will be released and granted to the next waiting process.
167      */
168     override suspend fun clusterLock(name: String): ClusterLock {
169         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
170         return ClusterLockImpl(hazelcast, name)
171     }
172
173     /** Return interface may change and it will be included in BluePrintClusterService */
174     @UseExperimental
175     suspend fun clusterScheduler(name: String): IScheduledExecutorService {
176         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
177         return hazelcast.getScheduledExecutorService(name)
178     }
179
180     override suspend fun shutDown(duration: Duration) {
181         if (::hazelcast.isInitialized && clusterJoined()) {
182             delay(duration.toMillis())
183             HazelcastClusterUtils.terminate(hazelcast)
184         }
185     }
186
187     /** Utils */
188     suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
189         if (!joinedClient && !joinedLite) {
190             HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
191         }
192     }
193
194     suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
195         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
196         check(!isClient()) { "not supported for cluster client members." }
197         return hazelcastApplicationMembers(ClusterUtils.applicationName())
198     }
199
200     suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
201         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
202         val applicationMembers: MutableMap<String, Member> = hashMapOf()
203         hazelcast.cluster.members.map { member ->
204             val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
205             if (memberName.startsWith(appName, true)) {
206                 applicationMembers[memberName] = member
207             }
208         }
209         return applicationMembers
210     }
211 }
212
213 open class BlueprintsClusterMembershipListener() :
214     MembershipListener {
215
216     private val log = logger(BlueprintsClusterMembershipListener::class)
217
218     override fun memberRemoved(membershipEvent: MembershipEvent) {
219         log.info("MembershipEvent: $membershipEvent")
220     }
221
222     override fun memberAdded(membershipEvent: MembershipEvent) {
223         log.info("MembershipEvent: $membershipEvent")
224     }
225 }
226
227 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
228
229     private val log = logger(ClusterLockImpl::class)
230
231     private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name)
232
233     override fun name(): String {
234         return distributedLock.name
235     }
236
237     override suspend fun lock() {
238         distributedLock.lock()
239         log.trace("Cluster lock($name) created..")
240     }
241
242     override suspend fun tryLock(timeout: Long): Boolean {
243         return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
244             .also {
245                 if (it) log.trace("Cluster lock acquired: $name")
246                 else log.trace("Failed to acquire Cluster lock $name within timeout $timeout")
247             }
248     }
249
250     override suspend fun unLock() {
251         distributedLock.unlock()
252         log.trace("Cluster unlock(${name()}) successfully..")
253     }
254
255     override fun isLocked(): Boolean {
256         return distributedLock.isLocked
257     }
258
259     override fun isLockedByCurrentThread(): Boolean {
260         return distributedLock.isLockedByCurrentThread
261     }
262
263     override suspend fun fenceLock(): String {
264         val fence = distributedLock.lockAndGetFence()
265         log.trace("Cluster lock($name) fence($fence) created..")
266         return fence.toString()
267     }
268
269     override suspend fun tryFenceLock(timeout: Long): String {
270         return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
271     }
272
273     override fun close() {
274     }
275 }