Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / processor-core / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / core / cluster / HazelcastClusterService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
18
19 import com.hazelcast.client.HazelcastClient
20 import com.hazelcast.client.config.ClientConfig
21 import com.hazelcast.client.config.YamlClientConfigBuilder
22 import com.hazelcast.cluster.Member
23 import com.hazelcast.cluster.MembershipEvent
24 import com.hazelcast.cluster.MembershipListener
25 import com.hazelcast.config.Config
26 import com.hazelcast.config.FileSystemYamlConfig
27 import com.hazelcast.config.MemberAttributeConfig
28 import com.hazelcast.core.Hazelcast
29 import com.hazelcast.core.HazelcastInstance
30 import com.hazelcast.cp.CPSubsystemManagementService
31 import com.hazelcast.cp.lock.FencedLock
32 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
33 import com.hazelcast.topic.Message
34 import com.hazelcast.topic.MessageListener
35 import kotlinx.coroutines.delay
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
37 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
38 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
39 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
40 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
41 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
42 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
43 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
44 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
45 import org.onap.ccsdk.cds.controllerblueprints.core.logger
46 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
47 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
48 import org.springframework.context.ApplicationEventPublisher
49 import org.springframework.stereotype.Service
50 import java.time.Duration
51 import java.util.UUID
52
53 import java.util.concurrent.TimeUnit
54
55 @Service
56 open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService {
57
58     private val log = logger(HazelcastClusterService::class)
59     lateinit var hazelcast: HazelcastInstance
60     lateinit var cpSubsystemManagementService: CPSubsystemManagementService
61     var joinedClient = false
62     var joinedLite = false
63
64     override suspend fun <T> startCluster(configuration: T) {
65         /** Get the Hazelcast Client or Server instance */
66         hazelcast =
67             when (configuration) {
68                 is Config -> {
69                     joinedLite = configuration.isLiteMember
70                     val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
71                     /** Promote as CP Member */
72                     promoteAsCPMember(hazelcastInstance)
73                     hazelcastInstance
74                 }
75                 is ClientConfig -> {
76                     joinedClient = true
77                     HazelcastClient.newHazelcastClient(configuration)
78                 }
79                 is ClusterInfo -> {
80
81                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
82                     System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
83
84                     val memberAttributeConfig = MemberAttributeConfig()
85                     memberAttributeConfig.setAttribute(
86                         BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
87                         configuration.nodeId
88                     )
89
90                     val configFile = configuration.configFile
91
92                     /** Check file exists */
93                     val clusterConfigFile = normalizedFile(configuration.configFile)
94                     check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
95                         "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
96                     }
97                     check(clusterConfigFile.exists()) {
98                         "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
99                     }
100                     log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
101
102                     /** Hazelcast Client from config file */
103                     if (configuration.joinAsClient) {
104                         /** Set the configuration file to system properties, so that Hazelcast will read automatically */
105                         System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
106                         joinedClient = true
107                         val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
108                         hazelcastClientConfiguration.properties = configuration.properties
109                         HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
110                     } else {
111                         /** Hazelcast Server from config file */
112                         val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
113                         hazelcastServerConfiguration.clusterName = configuration.id
114                         hazelcastServerConfiguration.instanceName = configuration.nodeId
115                         hazelcastServerConfiguration.properties = configuration.properties
116                         hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
117                         joinedLite = hazelcastServerConfiguration.isLiteMember
118                         val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
119                         /** Promote as CP Member */
120                         promoteAsCPMember(hazelcastInstance)
121                         hazelcastInstance
122                     }
123                 }
124                 else -> {
125                     throw BluePrintProcessorException("couldn't understand the cluster configuration")
126                 }
127             }
128
129         /** Add the Membership Listeners */
130         hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
131         log.info(
132             "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
133         )
134         applicationEventPublisher.publishEvent(ClusterJoinedEvent(this))
135     }
136
137     override fun isClient(): Boolean {
138         return joinedClient
139     }
140
141     override fun isLiteMember(): Boolean {
142         return joinedLite
143     }
144
145     override fun clusterJoined(): Boolean {
146         return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning
147     }
148
149     override suspend fun masterMember(partitionGroup: String): ClusterMember {
150         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
151         return hazelcast.cluster.members.first().toClusterMember()
152     }
153
154     override suspend fun allMembers(): Set<ClusterMember> {
155         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
156         return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
157     }
158
159     override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
160         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
161         return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
162     }
163
164     override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
165         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
166         return hazelcast.getMap<String, T>(name)
167     }
168
169     /**
170      * The DistributedLock is a distributed implementation of Java’s Lock.
171      * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
172      * determine ordering of multiple concurrent lock holders.
173      * DistributedLocks are designed to account for failures within the cluster.
174      * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
175      * the lock will be released and granted to the next waiting process.
176      */
177     override suspend fun clusterLock(name: String): ClusterLock {
178         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
179         return ClusterLockImpl(hazelcast, name)
180     }
181
182     /** Return interface may change and it will be included in BluePrintClusterService */
183     @UseExperimental
184     suspend fun clusterScheduler(name: String): IScheduledExecutorService {
185         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
186         return hazelcast.getScheduledExecutorService(name)
187     }
188
189     override suspend fun shutDown(duration: Duration) {
190         if (::hazelcast.isInitialized && clusterJoined()) {
191             delay(duration.toMillis())
192             HazelcastClusterUtils.terminate(hazelcast)
193         }
194     }
195
196     override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) {
197         hazelcast.getReliableTopic<T>(topic.name).publish(message)
198     }
199
200     override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID {
201         log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...")
202         return hazelcast.getReliableTopic<T>(topic.name)
203             .addMessageListener(HazelcastMessageListenerAdapter(listener))
204     }
205
206     override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean {
207         log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...")
208         return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid)
209     }
210
211     /** Utils */
212     suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
213         if (!joinedClient && !joinedLite) {
214             HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
215         }
216     }
217
218     suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
219         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
220         check(!isClient()) { "not supported for cluster client members." }
221         return hazelcastApplicationMembers(ClusterUtils.applicationName())
222     }
223
224     suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
225         check(::hazelcast.isInitialized) { "failed to start and join cluster" }
226         val applicationMembers: MutableMap<String, Member> = hashMapOf()
227         hazelcast.cluster.members.map { member ->
228             val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
229             if (memberName.startsWith(appName, true)) {
230                 applicationMembers[memberName] = member
231             }
232         }
233         return applicationMembers
234     }
235 }
236
237 open class BlueprintsClusterMembershipListener() :
238     MembershipListener {
239
240     private val log = logger(BlueprintsClusterMembershipListener::class)
241
242     override fun memberRemoved(membershipEvent: MembershipEvent) {
243         log.info("MembershipEvent: $membershipEvent")
244     }
245
246     override fun memberAdded(membershipEvent: MembershipEvent) {
247         log.info("MembershipEvent: $membershipEvent")
248     }
249 }
250
251 open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
252
253     private val log = logger(ClusterLockImpl::class)
254
255     private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name)
256
257     override fun name(): String {
258         return distributedLock.name
259     }
260
261     override suspend fun lock() {
262         distributedLock.lock()
263         log.trace("Cluster lock($name) created..")
264     }
265
266     override suspend fun tryLock(timeout: Long): Boolean {
267         return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
268             .also {
269                 if (it) log.trace("Cluster lock acquired: $name")
270                 else log.trace("Failed to acquire Cluster lock $name within timeout $timeout")
271             }
272     }
273
274     override suspend fun unLock() {
275         distributedLock.unlock()
276         log.trace("Cluster unlock(${name()}) successfully..")
277     }
278
279     override fun isLocked(): Boolean {
280         return distributedLock.isLocked
281     }
282
283     override fun isLockedByCurrentThread(): Boolean {
284         return distributedLock.isLockedByCurrentThread
285     }
286
287     override suspend fun fenceLock(): String {
288         val fence = distributedLock.lockAndGetFence()
289         log.trace("Cluster lock($name) fence($fence) created..")
290         return fence.toString()
291     }
292
293     override suspend fun tryFenceLock(timeout: Long): String {
294         return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
295     }
296
297     override fun close() {
298     }
299 }
300
301 class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> {
302     override fun onMessage(message: Message<E>?) = message?.let {
303         BluePrintClusterMessage<E>(
304             BlueprintClusterTopic.valueOf(it.source as String),
305             it.messageObject,
306             it.publishTime,
307             it.publishingMember.toClusterMember()
308         )
309     }.let { listener.onMessage(it) }
310 }