Fixed NoClassDefFoundError when USE_SCRIPT_COMPILE_CACHE is set to false 09/116409/3
authorJulien Fontaine <julien.fontaine@bell.ca>
Wed, 2 Dec 2020 21:25:08 +0000 (16:25 -0500)
committerKAPIL SINGAL <ks220y@att.com>
Wed, 16 Dec 2020 14:59:41 +0000 (14:59 +0000)
USE_SCRIPT_COMPILE_CACHE set to false cleans the Class Loader cache after each kotlin script execution.
When several kotlin script are executed in parallel (ie no dependency between them) and USE_SCRIPT_COMPILE_CACHE=false then
the class loader from the cache may be deleted before one of those executed kotlin script get the time to finish which to the NoClassDefFoundError.

Removed cleanupInstance method for kotlin script executors that where causing the class loader to be removed prematurely.
Now the behaviour is to remove the class loader from the cache only when we publish a new CBA which was already the case when CDS run with a single instance.
In cluster mode, a topic has been added to hazelcast to allow the instance publishing the updated CBA to communicate to the other instances by sending a message to clean the class loader
for this CBA from their cache.

Added mutex on kotlin script compilation to fix race condition. For concurrent kotlin script execution each process wanted to compile an executable but it was causing a race condition if a process tries to execute while another still compile. Mutex on the execution path prevent this behaviour

Issue-ID: CCSDK-3052
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Signed-off-by: Jozsef Csongvai <jozsef.csongvai@bell.ca>
Change-Id: I6ab002352b3272898ad0b183341ee664652c8ae3

17 files changed:
ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt
ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt
ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt
ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt
ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt
ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt
ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt

index b647900..49b9207 100644 (file)
@@ -66,11 +66,6 @@ open class ComponentNetconfExecutor(private var componentFunctionScriptingServic
 
         // Handles both script processing and error handling
         scriptComponent.executeScript(executionServiceInput)
-
-        componentFunctionScriptingService.cleanupInstance(
-            bluePrintRuntimeService.bluePrintContext(),
-            scriptType
-        )
     }
 
     override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {
index 8a954c1..4ce5df1 100644 (file)
@@ -86,10 +86,6 @@ class IpAssignResolutionCapabilityTest {
                     .scriptInstance<ResourceAssignmentProcessor>(any(), any(), any())
             } returns IpAssignResolutionCapability()
 
-            coEvery {
-                componentFunctionScriptingService.cleanupInstance(any(), any())
-            } returns mockk()
-
             val raRuntimeService = mockk<ResourceAssignmentRuntimeService>()
             every { raRuntimeService.bluePrintContext() } returns mockk()
             every { raRuntimeService.getInputValue("fixed_ipv4_Address_01") } returns NullNode.getInstance()
index 449845f..8c0aca4 100644 (file)
@@ -32,6 +32,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.util
 import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService
 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ComponentFunctionScriptingService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintError
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
@@ -87,11 +88,8 @@ class NamingResolutionCapabilityTest {
                     .scriptInstance<ResourceAssignmentProcessor>(any(), any(), any())
             } returns NamingResolutionCapability()
 
-            coEvery {
-                componentFunctionScriptingService.cleanupInstance(any(), any())
-            } returns mockk()
-
             val raRuntimeService = mockk<ResourceAssignmentRuntimeService>()
+            every { raRuntimeService.getBluePrintError() } returns BluePrintError()
             every { raRuntimeService.bluePrintContext() } returns mockk<BluePrintContext>()
             every { raRuntimeService.getInputValue("vf-module-name") } returns NullNode.getInstance()
             every { raRuntimeService.getInputValue("vnfc-name") } returns NullNode.getInstance()
index f618b41..1b0058b 100644 (file)
@@ -52,10 +52,6 @@ class CapabilityResourceResolutionProcessorTest {
                     .scriptInstance<ResourceAssignmentProcessor>(any(), any(), any())
             } returns MockCapabilityScriptRA()
 
-            coEvery {
-                componentFunctionScriptingService.cleanupInstance(any(), any())
-            } returns mockk()
-
             val raRuntimeService = mockk<ResourceAssignmentRuntimeService>()
             every { raRuntimeService.bluePrintContext() } returns mockk<BluePrintContext>()
             every { raRuntimeService.getInputValue("test-property") } returns NullNode.getInstance()
@@ -100,10 +96,6 @@ class CapabilityResourceResolutionProcessorTest {
                     .scriptInstance<ResourceAssignmentProcessor>(any(), BluePrintConstants.SCRIPT_JYTHON, any())
             } returns MockCapabilityScriptRA()
 
-            coEvery {
-                componentFunctionScriptingService.cleanupInstance(any(), any())
-            } returns mockk()
-
             val resourceAssignmentRuntimeService = ResourceAssignmentRuntimeService("1234", bluePrintContext)
 
             val capabilityResourceResolutionProcessor =
index d1b42ff..230097f 100644 (file)
 
 package org.onap.ccsdk.cds.controllerblueprints.core.scripts
 
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+import com.google.common.cache.LoadingCache
 import kotlinx.coroutines.async
 import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
 import org.jetbrains.kotlin.cli.common.ExitCode
 import org.jetbrains.kotlin.cli.common.arguments.parseCommandLineArguments
 import org.jetbrains.kotlin.cli.common.messages.CompilerMessageLocation
@@ -44,6 +49,8 @@ open class BluePrintCompileService {
         val classPaths = classpathFromClasspathProperty()?.joinToString(File.pathSeparator) {
             it.absolutePath
         }
+        val mutexCache: LoadingCache<String, Mutex> = CacheBuilder.newBuilder()
+            .build(CacheLoader.from { s -> Mutex() })
     }
 
     /** Compile the [bluePrintSourceCode] and get the [kClassName] instance for the constructor [args] */
@@ -54,8 +61,11 @@ open class BluePrintCompileService {
     ): T {
         /** Compile the source code if needed */
         log.debug("Jar Exists : ${bluePrintSourceCode.targetJarFile.exists()}, Regenerate : ${bluePrintSourceCode.regenerate}")
-        if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) {
-            compile(bluePrintSourceCode)
+
+        mutexCache.get(bluePrintSourceCode.targetJarFile.absolutePath).withLock {
+            if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) {
+                compile(bluePrintSourceCode)
+            }
         }
 
         val classLoaderWithDependencies = BluePrintCompileCache.classLoader(bluePrintSourceCode.cacheKey)
index fa8ca27..f3eb1a2 100644 (file)
@@ -79,11 +79,4 @@ open class BluePrintScriptsServiceImpl : BluePrintScriptsService {
         return Thread.currentThread().contextClassLoader.loadClass(scriptClassName).constructors
             .single().newInstance(*args.toArray()) as T
     }
-
-    override suspend fun cleanupInstance(blueprintBasePath: String) {
-        if (!BluePrintConstants.USE_SCRIPT_COMPILE_CACHE) {
-            log.info("Invalidating compile cache for blueprint ($blueprintBasePath)")
-            BluePrintCompileCache.cleanClassLoader(blueprintBasePath)
-        }
-    }
 }
index 1b58bc0..6637c62 100755 (executable)
@@ -18,6 +18,8 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.db.primary.service
 
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModel
 import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelContent
 import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.repository.BlueprintModelRepository
@@ -34,6 +36,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName
 import org.onap.ccsdk.cds.controllerblueprints.core.reCreateNBDirs
 import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils
 import org.slf4j.LoggerFactory
 import org.springframework.dao.DataIntegrityViolationException
@@ -60,7 +63,7 @@ class BlueprintProcessorCatalogServiceImpl(
         // Clean blueprint script cache
         val cacheKey = BluePrintFileUtils
             .compileCacheKey(normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, name, version))
-        BluePrintCompileCache.cleanClassLoader(cacheKey)
+        cleanClassLoader(cacheKey)
         log.info("removed cba file name($name), version($version) from cache")
         // Cleaning Deployed Blueprint
         deleteNBDir(bluePrintLoadConfiguration.blueprintDeployPath, name, version)
@@ -132,7 +135,7 @@ class BlueprintProcessorCatalogServiceImpl(
                 normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, artifactName, artifactVersion)
 
             val cacheKey = BluePrintFileUtils.compileCacheKey(deployFile)
-            BluePrintCompileCache.cleanClassLoader(cacheKey)
+            cleanClassLoader(cacheKey)
 
             deleteNBDir(deployFile).let {
                 if (it) log.info("Deleted deployed blueprint model :$artifactName::$artifactVersion")
@@ -174,4 +177,14 @@ class BlueprintProcessorCatalogServiceImpl(
             )
         }
     }
+
+    private suspend fun cleanClassLoader(cacheKey: String) {
+        val clusterService = BluePrintDependencyService.optionalClusterService()
+        if (null == clusterService)
+            BluePrintCompileCache.cleanClassLoader(cacheKey)
+        else {
+            log.info("Sending ClusterMessage: Clean Classloader Cache")
+            clusterService.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, cacheKey)
+        }
+    }
 }
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt
new file mode 100644 (file)
index 0000000..090130f
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+enum class BlueprintClusterTopic {
+    BLUEPRINT_CLEAN_COMPILER_CACHE
+}
index 5493bac..fb90567 100644 (file)
@@ -30,9 +30,14 @@ import com.hazelcast.core.HazelcastInstance
 import com.hazelcast.cp.CPSubsystemManagementService
 import com.hazelcast.cp.lock.FencedLock
 import com.hazelcast.scheduledexecutor.IScheduledExecutorService
+import com.hazelcast.topic.Message
+import com.hazelcast.topic.MessageListener
 import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
@@ -40,12 +45,15 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+import org.springframework.context.ApplicationEventPublisher
 import org.springframework.stereotype.Service
 import java.time.Duration
+import java.util.UUID
+
 import java.util.concurrent.TimeUnit
 
 @Service
-open class HazelcastClusterService : BluePrintClusterService {
+open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService {
 
     private val log = logger(HazelcastClusterService::class)
     lateinit var hazelcast: HazelcastInstance
@@ -123,6 +131,7 @@ open class HazelcastClusterService : BluePrintClusterService {
         log.info(
             "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
         )
+        applicationEventPublisher.publishEvent(ClusterJoinedEvent(this))
     }
 
     override fun isClient(): Boolean {
@@ -184,6 +193,21 @@ open class HazelcastClusterService : BluePrintClusterService {
         }
     }
 
+    override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) {
+        hazelcast.getReliableTopic<T>(topic.name).publish(message)
+    }
+
+    override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID {
+        log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...")
+        return hazelcast.getReliableTopic<T>(topic.name)
+            .addMessageListener(HazelcastMessageListenerAdapter(listener))
+    }
+
+    override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean {
+        log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...")
+        return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid)
+    }
+
     /** Utils */
     suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
         if (!joinedClient && !joinedLite) {
@@ -273,3 +297,14 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val
     override fun close() {
     }
 }
+
+class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> {
+    override fun onMessage(message: Message<E>?) = message?.let {
+        BluePrintClusterMessage<E>(
+            BlueprintClusterTopic.valueOf(it.source as String),
+            it.messageObject,
+            it.publishTime,
+            it.publishingMember.toClusterMember()
+        )
+    }.let { listener.onMessage(it) }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt
new file mode 100644 (file)
index 0000000..3833379
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.listeners
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Component
+
+@Component
+@ConditionalOnProperty("CLUSTER_ENABLED", havingValue = "true")
+open class BlueprintCompilerCacheMessageListener(private val clusterService: BluePrintClusterService) : BlueprintClusterMessageListener<String> {
+    private val log = logger(BlueprintCompilerCacheMessageListener::class)
+
+    @EventListener(ClusterJoinedEvent::class)
+    fun register() {
+        log.info("Registering BlueprintCompilerCacheMessageListener")
+        clusterService.addBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, this)
+    }
+
+    override fun onMessage(message: BluePrintClusterMessage<String>?) {
+        message?.let {
+            log.info("Received ClusterMessage - Cleaning compile cache for blueprint (${it.payload})")
+            BluePrintCompileCache.cleanClassLoader(it.payload)
+        }
+    }
+}
index 8cb9f4f..f7ba6f2 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.core.service
 
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.springframework.context.ApplicationEvent
 import java.time.Duration
 import java.util.Properties
+import java.util.UUID
 
 interface BluePrintClusterService {
 
@@ -53,6 +56,15 @@ interface BluePrintClusterService {
 
     /** Shut down the cluster with [duration] */
     suspend fun shutDown(duration: Duration)
+
+    /** Send [message] to the listener(s) of a [topic] */
+    suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T)
+
+    /** Register a [listener] to a [topic] and returns his UUID */
+    fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID
+
+    /** Unregister a listener from a [topic] using his [uuid] and returns true if it succeeded */
+    fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean
 }
 
 data class ClusterInfo(
@@ -83,4 +95,12 @@ interface ClusterLock {
     fun close()
 }
 
+class BluePrintClusterMessage<E>(val topic: BlueprintClusterTopic, val payload: E, publishTime: Long, clusterMember: ClusterMember)
+
+interface BlueprintClusterMessageListener<E> {
+    fun onMessage(message: BluePrintClusterMessage<E>?)
+}
+
+class ClusterJoinedEvent(source: Any) : ApplicationEvent(source)
+
 const val CDS_LOCK_GROUP = "cds-lock"
index da55642..ded0179 100644 (file)
@@ -22,6 +22,7 @@ import com.hazelcast.cluster.Member
 import com.hazelcast.config.FileSystemYamlConfig
 import com.hazelcast.instance.impl.HazelcastInstanceFactory
 import com.hazelcast.map.IMap
+import io.mockk.mockk
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
@@ -32,7 +33,9 @@ import kotlinx.coroutines.withContext
 import org.junit.After
 import org.junit.Before
 import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
@@ -41,6 +44,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
 import java.io.Serializable
 import java.util.Properties
 import kotlin.test.assertEquals
+import kotlin.test.assertFalse
 import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
 
@@ -91,6 +95,40 @@ class HazelcastClusterServiceTest {
         }
     }
 
+    @Test
+    fun testClusterMessaging() {
+        runBlocking {
+            val bluePrintClusterServiceOne =
+                createCluster(arrayListOf(1, 2, 3)).toMutableList()
+            printReachableMembers(bluePrintClusterServiceOne)
+            testMessageReceived(bluePrintClusterServiceOne)
+        }
+    }
+
+    private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) {
+        val sender = bluePrintClusterServices[0] as HazelcastClusterService
+        val receiver = bluePrintClusterServices[1] as HazelcastClusterService
+        val messageSent = "hello world"
+        var isMessageReceived = false
+        val uuid = receiver.addBlueprintClusterMessageListener(
+            BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE,
+            object : BlueprintClusterMessageListener<String> {
+                override fun onMessage(message: BluePrintClusterMessage<String>?) {
+                    log.info("Message received - ${message?.payload}")
+                    isMessageReceived = messageSent == message?.payload
+                }
+            }
+        )
+
+        assertNotNull(uuid)
+        sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent)
+        delay(1000)
+        assertTrue(isMessageReceived)
+
+        assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+        assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+    }
+
     private suspend fun createCluster(
         ids: List<Int>,
         joinAsClient: Boolean? = false
@@ -117,7 +155,7 @@ class HazelcastClusterServiceTest {
                                 properties = properties
                             )
                         }
-                    val hazelcastClusterService = HazelcastClusterService()
+                    val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true))
                     hazelcastClusterService.startCluster(clusterInfo)
                     hazelcastClusterService
                 }
index d107f01..3483ce1 100644 (file)
@@ -101,10 +101,4 @@ class ComponentFunctionScriptingService(
         }
         return scriptComponent
     }
-
-    suspend fun cleanupInstance(bluePrintContext: BluePrintContext, scriptType: String) {
-        if (scriptType == BluePrintConstants.SCRIPT_KOTLIN) {
-            BluePrintScriptsServiceImpl().cleanupInstance(bluePrintContext.rootPath)
-        }
-    }
 }
index 34eaf62..a7ef0a8 100644 (file)
@@ -61,8 +61,6 @@ open class ComponentScriptExecutor(private var componentFunctionScriptingService
 
         // Handles both script processing and error handling
         scriptComponentFunction.executeScript(executionServiceInput)
-
-        componentFunctionScriptingService.cleanupInstance(bluePrintRuntimeService.bluePrintContext(), scriptType)
     }
 
     override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {