USE_SCRIPT_COMPILE_CACHE set to false cleans the Class Loader cache after each kotlin script execution.
When several kotlin script are executed in parallel (ie no dependency between them) and USE_SCRIPT_COMPILE_CACHE=false then
the class loader from the cache may be deleted before one of those executed kotlin script get the time to finish which to the NoClassDefFoundError.
Removed cleanupInstance method for kotlin script executors that where causing the class loader to be removed prematurely.
Now the behaviour is to remove the class loader from the cache only when we publish a new CBA which was already the case when CDS run with a single instance.
In cluster mode, a topic has been added to hazelcast to allow the instance publishing the updated CBA to communicate to the other instances by sending a message to clean the class loader
for this CBA from their cache.
Added mutex on kotlin script compilation to fix race condition. For concurrent kotlin script execution each process wanted to compile an executable but it was causing a race condition if a process tries to execute while another still compile. Mutex on the execution path prevent this behaviour
Issue-ID: CCSDK-3052
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Signed-off-by: Jozsef Csongvai <jozsef.csongvai@bell.ca>
Change-Id: I6ab002352b3272898ad0b183341ee664652c8ae3
// Handles both script processing and error handling
scriptComponent.executeScript(executionServiceInput)
-
- componentFunctionScriptingService.cleanupInstance(
- bluePrintRuntimeService.bluePrintContext(),
- scriptType
- )
}
override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {
// Invoke componentResourceAssignmentProcessor
componentResourceAssignmentProcessor!!.executeScript(resourceAssignment)
-
- componentFunctionScriptingService.cleanupInstance(raRuntimeService.bluePrintContext(), scriptType)
}
}
.scriptInstance<ResourceAssignmentProcessor>(any(), any(), any())
} returns IpAssignResolutionCapability()
- coEvery {
- componentFunctionScriptingService.cleanupInstance(any(), any())
- } returns mockk()
-
val raRuntimeService = mockk<ResourceAssignmentRuntimeService>()
every { raRuntimeService.bluePrintContext() } returns mockk()
every { raRuntimeService.getInputValue("fixed_ipv4_Address_01") } returns NullNode.getInstance()
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ComponentFunctionScriptingService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintError
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
.scriptInstance<ResourceAssignmentProcessor>(any(), any(), any())
} returns NamingResolutionCapability()
- coEvery {
- componentFunctionScriptingService.cleanupInstance(any(), any())
- } returns mockk()
-
val raRuntimeService = mockk<ResourceAssignmentRuntimeService>()
+ every { raRuntimeService.getBluePrintError() } returns BluePrintError()
every { raRuntimeService.bluePrintContext() } returns mockk<BluePrintContext>()
every { raRuntimeService.getInputValue("vf-module-name") } returns NullNode.getInstance()
every { raRuntimeService.getInputValue("vnfc-name") } returns NullNode.getInstance()
.scriptInstance<ResourceAssignmentProcessor>(any(), any(), any())
} returns MockCapabilityScriptRA()
- coEvery {
- componentFunctionScriptingService.cleanupInstance(any(), any())
- } returns mockk()
-
val raRuntimeService = mockk<ResourceAssignmentRuntimeService>()
every { raRuntimeService.bluePrintContext() } returns mockk<BluePrintContext>()
every { raRuntimeService.getInputValue("test-property") } returns NullNode.getInstance()
.scriptInstance<ResourceAssignmentProcessor>(any(), BluePrintConstants.SCRIPT_JYTHON, any())
} returns MockCapabilityScriptRA()
- coEvery {
- componentFunctionScriptingService.cleanupInstance(any(), any())
- } returns mockk()
-
val resourceAssignmentRuntimeService = ResourceAssignmentRuntimeService("1234", bluePrintContext)
val capabilityResourceResolutionProcessor =
const val TOSCA_SPEC = "TOSCA"
- val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean()
-
const val LOG_PROTECT: String = "log-protect"
/** Cluster Properties */
suspend fun <T> scriptInstance(cacheKey: String, scriptClassName: String): T
suspend fun <T> scriptInstance(scriptClassName: String): T
-
- suspend fun cleanupInstance(blueprintBasePath: String)
}
package org.onap.ccsdk.cds.controllerblueprints.core.scripts
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+import com.google.common.cache.LoadingCache
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
import org.jetbrains.kotlin.cli.common.ExitCode
import org.jetbrains.kotlin.cli.common.arguments.parseCommandLineArguments
import org.jetbrains.kotlin.cli.common.messages.CompilerMessageLocation
val classPaths = classpathFromClasspathProperty()?.joinToString(File.pathSeparator) {
it.absolutePath
}
+ val mutexCache: LoadingCache<String, Mutex> = CacheBuilder.newBuilder()
+ .build(CacheLoader.from { s -> Mutex() })
}
/** Compile the [bluePrintSourceCode] and get the [kClassName] instance for the constructor [args] */
): T {
/** Compile the source code if needed */
log.debug("Jar Exists : ${bluePrintSourceCode.targetJarFile.exists()}, Regenerate : ${bluePrintSourceCode.regenerate}")
- if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) {
- compile(bluePrintSourceCode)
+
+ mutexCache.get(bluePrintSourceCode.targetJarFile.absolutePath).withLock {
+ if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) {
+ compile(bluePrintSourceCode)
+ }
}
val classLoaderWithDependencies = BluePrintCompileCache.classLoader(bluePrintSourceCode.cacheKey)
return Thread.currentThread().contextClassLoader.loadClass(scriptClassName).constructors
.single().newInstance(*args.toArray()) as T
}
-
- override suspend fun cleanupInstance(blueprintBasePath: String) {
- if (!BluePrintConstants.USE_SCRIPT_COMPILE_CACHE) {
- log.info("Invalidating compile cache for blueprint ($blueprintBasePath)")
- BluePrintCompileCache.cleanClassLoader(blueprintBasePath)
- }
- }
}
package org.onap.ccsdk.cds.blueprintsprocessor.db.primary.service
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModel
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelContent
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.repository.BlueprintModelRepository
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName
import org.onap.ccsdk.cds.controllerblueprints.core.reCreateNBDirs
import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils
import org.slf4j.LoggerFactory
import org.springframework.dao.DataIntegrityViolationException
// Clean blueprint script cache
val cacheKey = BluePrintFileUtils
.compileCacheKey(normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, name, version))
- BluePrintCompileCache.cleanClassLoader(cacheKey)
+ cleanClassLoader(cacheKey)
log.info("removed cba file name($name), version($version) from cache")
// Cleaning Deployed Blueprint
deleteNBDir(bluePrintLoadConfiguration.blueprintDeployPath, name, version)
normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, artifactName, artifactVersion)
val cacheKey = BluePrintFileUtils.compileCacheKey(deployFile)
- BluePrintCompileCache.cleanClassLoader(cacheKey)
+ cleanClassLoader(cacheKey)
deleteNBDir(deployFile).let {
if (it) log.info("Deleted deployed blueprint model :$artifactName::$artifactVersion")
)
}
}
+
+ private suspend fun cleanClassLoader(cacheKey: String) {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+ if (null == clusterService)
+ BluePrintCompileCache.cleanClassLoader(cacheKey)
+ else {
+ log.info("Sending ClusterMessage: Clean Classloader Cache")
+ clusterService.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, cacheKey)
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+enum class BlueprintClusterTopic {
+ BLUEPRINT_CLEAN_COMPILER_CACHE
+}
import com.hazelcast.cp.CPSubsystemManagementService
import com.hazelcast.cp.lock.FencedLock
import com.hazelcast.scheduledexecutor.IScheduledExecutorService
+import com.hazelcast.topic.Message
+import com.hazelcast.topic.MessageListener
import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import java.time.Duration
+import java.util.UUID
+
import java.util.concurrent.TimeUnit
@Service
-open class HazelcastClusterService : BluePrintClusterService {
+open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService {
private val log = logger(HazelcastClusterService::class)
lateinit var hazelcast: HazelcastInstance
log.info(
"Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
)
+ applicationEventPublisher.publishEvent(ClusterJoinedEvent(this))
}
override fun isClient(): Boolean {
}
}
+ override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) {
+ hazelcast.getReliableTopic<T>(topic.name).publish(message)
+ }
+
+ override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID {
+ log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...")
+ return hazelcast.getReliableTopic<T>(topic.name)
+ .addMessageListener(HazelcastMessageListenerAdapter(listener))
+ }
+
+ override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean {
+ log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...")
+ return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid)
+ }
+
/** Utils */
suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
if (!joinedClient && !joinedLite) {
override fun close() {
}
}
+
+class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> {
+ override fun onMessage(message: Message<E>?) = message?.let {
+ BluePrintClusterMessage<E>(
+ BlueprintClusterTopic.valueOf(it.source as String),
+ it.messageObject,
+ it.publishTime,
+ it.publishingMember.toClusterMember()
+ )
+ }.let { listener.onMessage(it) }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.listeners
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Component
+
+@Component
+@ConditionalOnProperty("CLUSTER_ENABLED", havingValue = "true")
+open class BlueprintCompilerCacheMessageListener(private val clusterService: BluePrintClusterService) : BlueprintClusterMessageListener<String> {
+ private val log = logger(BlueprintCompilerCacheMessageListener::class)
+
+ @EventListener(ClusterJoinedEvent::class)
+ fun register() {
+ log.info("Registering BlueprintCompilerCacheMessageListener")
+ clusterService.addBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, this)
+ }
+
+ override fun onMessage(message: BluePrintClusterMessage<String>?) {
+ message?.let {
+ log.info("Received ClusterMessage - Cleaning compile cache for blueprint (${it.payload})")
+ BluePrintCompileCache.cleanClassLoader(it.payload)
+ }
+ }
+}
package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.springframework.context.ApplicationEvent
import java.time.Duration
import java.util.Properties
+import java.util.UUID
interface BluePrintClusterService {
/** Shut down the cluster with [duration] */
suspend fun shutDown(duration: Duration)
+
+ /** Send [message] to the listener(s) of a [topic] */
+ suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T)
+
+ /** Register a [listener] to a [topic] and returns his UUID */
+ fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID
+
+ /** Unregister a listener from a [topic] using his [uuid] and returns true if it succeeded */
+ fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean
}
data class ClusterInfo(
fun close()
}
+class BluePrintClusterMessage<E>(val topic: BlueprintClusterTopic, val payload: E, publishTime: Long, clusterMember: ClusterMember)
+
+interface BlueprintClusterMessageListener<E> {
+ fun onMessage(message: BluePrintClusterMessage<E>?)
+}
+
+class ClusterJoinedEvent(source: Any) : ApplicationEvent(source)
+
const val CDS_LOCK_GROUP = "cds-lock"
import com.hazelcast.config.FileSystemYamlConfig
import com.hazelcast.instance.impl.HazelcastInstanceFactory
import com.hazelcast.map.IMap
+import io.mockk.mockk
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import org.junit.After
import org.junit.Before
import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import java.io.Serializable
import java.util.Properties
import kotlin.test.assertEquals
+import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
}
}
+ @Test
+ fun testClusterMessaging() {
+ runBlocking {
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(1, 2, 3)).toMutableList()
+ printReachableMembers(bluePrintClusterServiceOne)
+ testMessageReceived(bluePrintClusterServiceOne)
+ }
+ }
+
+ private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) {
+ val sender = bluePrintClusterServices[0] as HazelcastClusterService
+ val receiver = bluePrintClusterServices[1] as HazelcastClusterService
+ val messageSent = "hello world"
+ var isMessageReceived = false
+ val uuid = receiver.addBlueprintClusterMessageListener(
+ BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE,
+ object : BlueprintClusterMessageListener<String> {
+ override fun onMessage(message: BluePrintClusterMessage<String>?) {
+ log.info("Message received - ${message?.payload}")
+ isMessageReceived = messageSent == message?.payload
+ }
+ }
+ )
+
+ assertNotNull(uuid)
+ sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent)
+ delay(1000)
+ assertTrue(isMessageReceived)
+
+ assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+ assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+ }
+
private suspend fun createCluster(
ids: List<Int>,
joinAsClient: Boolean? = false
properties = properties
)
}
- val hazelcastClusterService = HazelcastClusterService()
+ val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true))
hazelcastClusterService.startCluster(clusterInfo)
hazelcastClusterService
}
}
return scriptComponent
}
-
- suspend fun cleanupInstance(bluePrintContext: BluePrintContext, scriptType: String) {
- if (scriptType == BluePrintConstants.SCRIPT_KOTLIN) {
- BluePrintScriptsServiceImpl().cleanupInstance(bluePrintContext.rootPath)
- }
- }
}
// Handles both script processing and error handling
scriptComponentFunction.executeScript(executionServiceInput)
-
- componentFunctionScriptingService.cleanupInstance(bluePrintRuntimeService.bluePrintContext(), scriptType)
}
override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {