09641458c720d209006d34d388596f316ea4ea98
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import kotlinx.coroutines.delay
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
40 import org.onap.ccsdk.cds.controllerblueprints.core.logger
41 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
43 import org.springframework.stereotype.Service
44 import java.time.Duration
45 import java.util.concurrent.TimeUnit
46
47 @Service
48 open class HazlecastClusterService : BluePrintClusterService {
49
50     private val log = logger(HazlecastClusterService::class)
51     lateinit var hazelcast: HazelcastInstance
52     lateinit var cpSubsystemManagementService: CPSubsystemManagementService
53     var joinedClient = false
54     var joinedLite = false
55
56     override suspend fun <T> startCluster(configuration: T) {
57         /** Get the Hazelcast Cliet or Server instance */
58         hazelcast =
59             when (configuration) {
60                 is Config -> {
61                     joinedLite = configuration.isLiteMember
62                     val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
63                     /** Promote as CP Member */
64                     promoteAsCPMember(hazelcastInstance)
65                     hazelcastInstance
66                 }
67                 is ClientConfig -> {
68                     joinedClient = true
69                     HazelcastClient.newHazelcastClient(configuration)
70                 }
71                 is ClusterInfo -> {
72
73                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
74                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
75
76                     val memberAttributeConfig = MemberAttributeConfig()
77                     memberAttributeConfig.setAttribute(
78                         BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
79                         configuration.nodeId
80                     )
81
82                     val configFile = configuration.configFile
83                     /** Check file exists */
84                     val clusterConfigFile = normalizedFile(configuration.configFile)
85                     check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
86                         "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
87                     }
88                     check(clusterConfigFile.exists()) {
89                         "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
90                     }
91                     log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
92
93                     /** Hazelcast Client from config file */
94                     if (configuration.joinAsClient) {
95                         /** Set the configuration file to system properties, so that Hazelcast will read automatically */
96                         System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
97                         joinedClient = true
98                         val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
99                         hazelcastClientConfiguration.properties = configuration.properties
100                         HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
101                     } else {
102                         /** Hazelcast Server from config file */
103                         val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
104                         hazelcastServerConfiguration.properties = configuration.properties
105                         hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
106                         joinedLite = hazelcastServerConfiguration.isLiteMember
107                         val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
108                         /** Promote as CP Member */
109                         promoteAsCPMember(hazelcastInstance)
110                         hazelcastInstance
111                     }
112                 }
113                 else -> {
114                     throw BluePrintProcessorException("couldn't understand the cluster configuration")
115                 }
116             }
117
118         /** Add the Membership Listeners */
119         hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this))
120         log.info(
121             "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
122         )
123     }
124
125     override fun isClient(): Boolean {
126         return joinedClient
127     }
128
129     override fun isLiteMember(): Boolean {
130         return joinedLite
131     }
132
133     override fun clusterJoined(): Boolean {
134         return hazelcast.lifecycleService.isRunning
135     }
136
137     override suspend fun masterMember(partitionGroup: String): ClusterMember {
138         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
139         return hazelcast.cluster.members.first().toClusterMember()
140     }
141
142     override suspend fun allMembers(): Set<ClusterMember> {
143         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
144         return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
145     }
146
147     override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
148         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
149         return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
150     }
151
152     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
153         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
154         return hazelcast.getMap<String, T>(name)
155     }
156
157     /**
158      * The DistributedLock is a distributed implementation of Java’s Lock.
159      * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
160      * determine ordering of multiple concurrent lock holders.
161      * DistributedLocks are designed to account for failures within the cluster.
162      * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
163      * the lock will be released and granted to the next waiting process.
164      */
165     override suspend fun clusterLock(name: String): ClusterLock {
166         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
167         return ClusterLockImpl(hazelcast, name)
168     }
169
170     /** Return interface may change and it will be included in BluePrintClusterService */
171     @UseExperimental
172     suspend fun clusterScheduler(name: String): IScheduledExecutorService {
173         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
174         return hazelcast.getScheduledExecutorService(name)
175     }
176
177     override suspend fun shutDown(duration: Duration) {
178         if (::hazelcast.isInitialized && clusterJoined()) {
179             delay(duration.toMillis())
180             HazlecastClusterUtils.terminate(hazelcast)
181         }
182     }
183
184     /** Utils */
185     suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
186         if (!joinedClient && !joinedLite) {
187             HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance)
188         }
189     }
190
191     suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
192         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
193         check(!isClient()) { "not supported for cluster client members." }
194         return hazelcastApplicationMembers(ClusterUtils.applicationName())
195     }
196
197     suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
198         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
199         val applicationMembers: MutableMap<String, Member> = hashMapOf()
200         hazelcast.cluster.members.map { member ->
201             val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
202             if (memberName.startsWith(appName, true)) {
203                 applicationMembers[memberName] = member
204             }
205         }
206         return applicationMembers
207     }
208 }
209
210 open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) :
211     MembershipListener {
212     private val log = logger(BlueprintsClusterMembershipListener::class)
213
214     override fun memberRemoved(membershipEvent: MembershipEvent) {
215         log.info("MembershipEvent: $membershipEvent")
216     }
217
218     override fun memberAdded(membershipEvent: MembershipEvent) {
219         log.info("MembershipEvent: $membershipEvent")
220     }
221 }
222
223 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
224     private val log = logger(ClusterLockImpl::class)
225
226     lateinit var distributedLock: FencedLock
227
228     override fun name(): String {
229         return distributedLock.name
230     }
231
232     override suspend fun lock() {
233         distributedLock = hazelcast.cpSubsystem.getLock(name)
234         distributedLock.lock()
235         log.trace("Cluster lock($name) created..")
236     }
237
238     override suspend fun tryLock(timeout: Long): Boolean {
239         distributedLock = hazelcast.cpSubsystem.getLock(name)
240         return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
241     }
242
243     override suspend fun unLock() {
244         distributedLock.unlock()
245         log.trace("Cluster unlock(${name()}) successfully..")
246     }
247
248     override fun isLocked(): Boolean {
249         return distributedLock.isLocked
250     }
251
252     override suspend fun fenceLock(): String {
253         distributedLock = hazelcast.cpSubsystem.getLock(name)
254         val fence = distributedLock.lockAndGetFence()
255         log.trace("Cluster lock($name) fence($fence) created..")
256         return fence.toString()
257     }
258
259     override suspend fun tryFenceLock(timeout: Long): String {
260         distributedLock = hazelcast.cpSubsystem.getLock(name)
261         return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
262     }
263
264     override fun close() {
265     }
266 }