Message Prioritization message group lock. 82/99982/1
authorBrinda Santh <bs2796@att.com>
Thu, 2 Jan 2020 16:59:29 +0000 (11:59 -0500)
committerBrinda Santh <bs2796@att.com>
Thu, 2 Jan 2020 16:59:29 +0000 (11:59 -0500)
Implementation to avoid concurrent procession of message group while prioritization.

Sample message prioritization Kafka listener properties.

Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Ifbf39985b03c662b6ccf7740be711cfeb7bfbebb

ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
ms/blueprintsprocessor/functions/message-prioritizaion/README.md [moved from ms/blueprintsprocessor/functions/message-prioritizaion/README.txt with 90% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt

index 89b4f65..26c7204 100755 (executable)
@@ -129,3 +129,10 @@ blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
 blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
 blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
 blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
+
+# Message prioritization kakfa properties, Enable if Prioritization service is needed
+# Deploy message-prioritization function along with blueprintsprocessor application.
+#blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth
+#blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092
+#blueprintsprocessor.messageconsumer.prioritize-input.applicationId=cds-controller
+#blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic
\ No newline at end of file
@@ -17,6 +17,9 @@ To List topics
 ----------------
 kafka-topics --list --bootstrap-server localhost:9092
 
+To publish message
+--------------------
+kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic
 
 To Listen for Output
 ----------------------
index ac46b36..c33adcb 100644 (file)
     <description>Blueprints Processor Function - Message Prioritization</description>
 
     <dependencies>
+        <dependency>
+            <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+            <artifactId>atomix-lib</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
             <artifactId>message-lib</artifactId>
index c2965c4..35566ab 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
 import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
@@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
 
     lateinit var prioritizationConfiguration: PrioritizationConfiguration
     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+    var clusterService: BluePrintClusterService? = null
 
     override fun init(context: ProcessorContext) {
         this.processorContext = context
@@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
         this.messagePrioritizationStateService = BluePrintDependencyService
             .messagePrioritizationStateService()
     }
+
+    /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
+    open fun initializeClusterService() {
+        /** Get the Cluster service to update in store */
+        if (BluePrintConstants.CLUSTER_ENABLED) {
+            this.clusterService = BluePrintDependencyService.clusterService()
+        }
+    }
 }
index ed124d1..b611060 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
 
 open class MessagePrioritizationConsumer(
     private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer(
                 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
                         as KafkaStreamsBasicAuthConsumerProperties
 
-                val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+                val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
                 log.info("Consuming prioritization topics($topics)")
 
                 topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
index 431e02f..4e4e2da 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
@@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
             ?: throw BluePrintProcessorException("failed to convert")
         try {
+            /** Get the cluster lock for message group */
+            val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
             // Save the Message
             messagePrioritizationStateService.saveMessage(messagePrioritize)
             handleCorrelationAndNextStep(messagePrioritize)
+            /** Cluster unLock for message group */
+            MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
             log.error(messagePrioritize.error)
@@ -68,12 +73,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         initializeExpiryPunctuator()
         /** Set up cleaning records cron */
         initializeCleanPunctuator()
+        /** Set up Cluster Service */
+        initializeClusterService()
     }
 
     override fun close() {
         log.info(
             "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
-                    "taskId(${processorContext.taskId()})"
+                "taskId(${processorContext.taskId()})"
         )
         expiryCancellable.cancel()
         cleanCancellable.cancel()
@@ -102,7 +109,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         )
         log.info(
             "Clean punctuator setup complete with expiry " +
-                    "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+                "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
         )
     }
 
@@ -115,7 +122,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
             val types = getGroupCorrelationTypes(messagePrioritization)
             log.info(
                 "checking correlation for message($id), group($group), types($types), " +
-                        "correlation id($correlationId)"
+                    "correlation id($correlationId)"
             )
 
             /** Get all previously received messages from database for group and optional types and correlation Id */
index 7e5862c..d1f38f4 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
 
 import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
 object MessageProcessorUtils {
 
+    /** Utility to create the cluster lock for message [messagePrioritization] */
+    suspend fun prioritizationGrouplock(
+        clusterService: BluePrintClusterService?,
+        messagePrioritization: MessagePrioritization
+    ): ClusterLock? {
+        return if (clusterService != null && clusterService.clusterJoined()) {
+            val lockName = "prioritization-${messagePrioritization.group}"
+            val clusterLock = clusterService.clusterLock(lockName)
+            clusterLock.lock()
+            if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+            clusterLock
+        } else null
+    }
+
+    /** Utility used to cluster unlock for message [messagePrioritization] */
+    suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
+        if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+            clusterLock.unLock()
+            clusterLock.close()
+        }
+    }
+
     fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
-            ProcessorSupplier<K, V> {
+        ProcessorSupplier<K, V> {
         return ProcessorSupplier<K, V> {
             // Dynamically resolve the Prioritization Processor
             val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
index 0ed9598..f9e23e8 100644 (file)
@@ -81,6 +81,9 @@ open class MessagePrioritizationConsumerTest {
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
+    @Autowired
+    lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+
     @Before
     fun setup() {
         BluePrintDependencyService.inject(applicationContext)
@@ -119,7 +122,8 @@ open class MessagePrioritizationConsumerTest {
             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
 
             // Test Topology
-            val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+            val kafkaStreamConsumerFunction =
+                spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
             val messageConsumerProperties = bluePrintMessageLibPropertyService
                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
@@ -135,7 +139,6 @@ open class MessagePrioritizationConsumerTest {
     // @Test
     fun testMessagePrioritizationConsumer() {
         runBlocking {
-            val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
             messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
 
             /** Send sample message with every 1 sec */
index 37d853c..3d3d0c6 100644 (file)
@@ -21,6 +21,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.context.annotation.Bean
 import org.springframework.context.annotation.ComponentScan
@@ -42,9 +43,34 @@ open class TestDatabaseConfiguration {
     }
 }
 
-@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
+/* Sample Prioritization Listener, used during Application startup
+@Component
+open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) {
+
+    private val log = logger(SamplePrioritizationListeners::class)
+
+    @EventListener(ApplicationReadyEvent::class)
+    open fun init() = runBlocking {
+        log.info("Starting PrioritizationListeners...")
+        defaultMessagePrioritizationConsumer
+            .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+    }
+
+    @PreDestroy
+    open fun destroy() = runBlocking {
+        log.info("Shutting down PrioritizationListeners...")
+        defaultMessagePrioritizationConsumer.shutDown()
+    }
+}
+ */
 
+@Service
+open class SampleMessagePrioritizationConsumer(
+    bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+
+@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
     override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
         return when (messagePrioritization.group) {
             "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
@@ -54,7 +80,7 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
 }
 
 @Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor()
+open class SampleMessageAggregateProcessor() : MessageAggregateProcessor()
 
 @Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class DefaultMessageOutputProcessor : MessageOutputProcessor()
+open class SampleMessageOutputProcessor : MessageOutputProcessor()
index 0690eb8..214a143 100644 (file)
@@ -148,12 +148,18 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
 }
 
 open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
+    val log = logger(ClusterLockImpl::class)
 
     lateinit var distributedLock: DistributedLock
 
+    override fun name(): String {
+        return distributedLock.name()
+    }
+
     override suspend fun lock() {
         distributedLock = AtomixLibUtils.distributedLock(atomix, name)
         distributedLock.lock()
+        log.debug("Cluster lock($name) created..")
     }
 
     override suspend fun tryLock(timeout: Long): Boolean {
@@ -163,6 +169,7 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String)
 
     override suspend fun unLock() {
         distributedLock.unlock()
+        log.debug("Cluster unlock(${name()}) successfully..")
     }
 
     override fun isLocked(): Boolean {