83a04d6535c89dfbeabe1a08ae3f2e720057d145
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.lock.FencedLock
31 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
32 import kotlinx.coroutines.delay
33 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
34 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
37 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
39 import org.onap.ccsdk.cds.controllerblueprints.core.logger
40 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
41 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
42 import org.springframework.stereotype.Service
43 import java.time.Duration
44 import java.util.concurrent.TimeUnit
45
46 @Service
47 open class HazlecastClusterService : BluePrintClusterService {
48
49     private val log = logger(HazlecastClusterService::class)
50     lateinit var hazelcast: HazelcastInstance
51     var joinedClient = false
52     var joinedLite = false
53
54     override suspend fun <T> startCluster(configuration: T) {
55         /** Get the Hazelcast Cliet or Server instance */
56         hazelcast =
57             when (configuration) {
58                 is Config -> {
59                     joinedLite = configuration.isLiteMember
60                     Hazelcast.newHazelcastInstance(configuration)
61                 }
62                 is ClientConfig -> {
63                     joinedClient = true
64                     HazelcastClient.newHazelcastClient(configuration)
65                 }
66                 is ClusterInfo -> {
67
68                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
69                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
70
71                     val memberAttributeConfig = MemberAttributeConfig()
72                     memberAttributeConfig.setAttribute(
73                         BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
74                         configuration.nodeId
75                     )
76
77                     val configFile = configuration.configFile
78                     /** Check file exists */
79                     val clusterConfigFile = normalizedFile(configuration.configFile)
80                     check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
81                         "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
82                     }
83                     check(clusterConfigFile.exists()) {
84                         "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
85                     }
86                     log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
87
88                     /** Hazelcast Client from config file */
89                     if (configuration.joinAsClient) {
90                         /** Set the configuration file to system properties, so that Hazelcast will read automatically */
91                         System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
92                         joinedClient = true
93                         val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
94                         hazelcastClientConfiguration.properties = configuration.properties
95                         HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
96                     } else {
97                         /** Hazelcast Server from config file */
98                         val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
99                         hazelcastServerConfiguration.properties = configuration.properties
100                         hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
101                         joinedLite = hazelcastServerConfiguration.isLiteMember
102                         Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
103                     }
104                 }
105                 else -> {
106                     throw BluePrintProcessorException("couldn't understand the cluster configuration")
107                 }
108             }
109
110         /** Add the Membership Listeners */
111         hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this))
112         log.info(
113             "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
114         )
115     }
116
117     override fun isClient(): Boolean {
118         return joinedClient
119     }
120
121     override fun isLiteMember(): Boolean {
122         return joinedLite
123     }
124
125     override fun clusterJoined(): Boolean {
126         return hazelcast.lifecycleService.isRunning
127     }
128
129     override suspend fun masterMember(partitionGroup: String): ClusterMember {
130         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
131         return hazelcast.cluster.members.first().toClusterMember()
132     }
133
134     override suspend fun allMembers(): Set<ClusterMember> {
135         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
136         return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
137     }
138
139     override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
140         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
141         return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
142     }
143
144     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
145         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
146         return hazelcast.getMap<String, T>(name)
147     }
148
149     /**
150      * The DistributedLock is a distributed implementation of Java’s Lock.
151      * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
152      * determine ordering of multiple concurrent lock holders.
153      * DistributedLocks are designed to account for failures within the cluster.
154      * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
155      * the lock will be released and granted to the next waiting process.
156      */
157     override suspend fun clusterLock(name: String): ClusterLock {
158         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
159         return ClusterLockImpl(hazelcast, name)
160     }
161
162     /** Return interface may change and it will be included in BluePrintClusterService */
163     @UseExperimental
164     suspend fun clusterScheduler(name: String): IScheduledExecutorService {
165         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
166         return hazelcast.getScheduledExecutorService(name)
167     }
168
169     override suspend fun shutDown(duration: Duration) {
170         if (::hazelcast.isInitialized && clusterJoined()) {
171             delay(duration.toMillis())
172             hazelcast.lifecycleService.terminate()
173         }
174     }
175
176     /** Utils */
177     suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
178         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
179         check(!isClient()) { "not supported for cluster client members." }
180         return hazelcastApplicationMembers(ClusterUtils.applicationName())
181     }
182
183     suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
184         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
185         val applicationMembers: MutableMap<String, Member> = hashMapOf()
186         hazelcast.cluster.members.map { member ->
187             val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
188             if (memberName.startsWith(appName, true)) {
189                 applicationMembers[memberName] = member
190             }
191         }
192         return applicationMembers
193     }
194 }
195
196 open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) :
197     MembershipListener {
198     private val log = logger(BlueprintsClusterMembershipListener::class)
199
200     override fun memberRemoved(membershipEvent: MembershipEvent) {
201         log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent")
202     }
203
204     override fun memberAdded(membershipEvent: MembershipEvent) {
205         log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent")
206     }
207 }
208
209 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
210     private val log = logger(ClusterLockImpl::class)
211
212     lateinit var distributedLock: FencedLock
213
214     override fun name(): String {
215         return distributedLock.name
216     }
217
218     override suspend fun lock() {
219         distributedLock = hazelcast.cpSubsystem.getLock(name)
220         distributedLock.lock()
221         log.trace("Cluster lock($name) created..")
222     }
223
224     override suspend fun tryLock(timeout: Long): Boolean {
225         distributedLock = hazelcast.cpSubsystem.getLock(name)
226         return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
227     }
228
229     override suspend fun unLock() {
230         distributedLock.unlock()
231         log.trace("Cluster unlock(${name()}) successfully..")
232     }
233
234     override fun isLocked(): Boolean {
235         return distributedLock.isLocked
236     }
237
238     override suspend fun fenceLock(): String {
239         distributedLock = hazelcast.cpSubsystem.getLock(name)
240         val fence = distributedLock.lockAndGetFence()
241         log.trace("Cluster lock($name) fence($fence) created..")
242         return fence.toString()
243     }
244
245     override suspend fun tryFenceLock(timeout: Long): String {
246         distributedLock = hazelcast.cpSubsystem.getLock(name)
247         return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
248     }
249
250     override fun close() {
251     }
252 }