From: Brinda Santh Date: Thu, 2 Jan 2020 16:59:29 +0000 (-0500) Subject: Message Prioritization message group lock. X-Git-Tag: 0.7.0~92 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=4f4e2de08d3c6259da2497950a96d549d3e82f8a;p=ccsdk%2Fcds.git Message Prioritization message group lock. Implementation to avoid concurrent procession of message group while prioritization. Sample message prioritization Kafka listener properties. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: Ifbf39985b03c662b6ccf7740be711cfeb7bfbebb --- diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties index 89b4f65b4..26c7204d9 100755 --- a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties +++ b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties @@ -129,3 +129,10 @@ blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092 blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id blueprintsprocessor.messageproducer.self-service-api.topic=producer.t + +# Message prioritization kakfa properties, Enable if Prioritization service is needed +# Deploy message-prioritization function along with blueprintsprocessor application. +#blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth +#blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092 +#blueprintsprocessor.messageconsumer.prioritize-input.applicationId=cds-controller +#blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md similarity index 90% rename from ms/blueprintsprocessor/functions/message-prioritizaion/README.txt rename to ms/blueprintsprocessor/functions/message-prioritizaion/README.md index baf168767..482bbc2cc 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md @@ -17,6 +17,9 @@ To List topics ---------------- kafka-topics --list --bootstrap-server localhost:9092 +To publish message +-------------------- +kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic To Listen for Output ---------------------- diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml index ac46b3635..c33adcb70 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml @@ -31,6 +31,10 @@ Blueprints Processor Function - Message Prioritization + + org.onap.ccsdk.cds.blueprintsprocessor + atomix-lib + org.onap.ccsdk.cds.blueprintsprocessor message-lib diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt index c2965c4e8..35566abb4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt @@ -17,8 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessa lateinit var prioritizationConfiguration: PrioritizationConfiguration lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + var clusterService: BluePrintClusterService? = null override fun init(context: ProcessorContext) { this.processorContext = context @@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessa this.messagePrioritizationStateService = BluePrintDependencyService .messagePrioritizationStateService() } + + /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */ + open fun initializeClusterService() { + /** Get the Cluster service to update in store */ + if (BluePrintConstants.CLUSTER_ENABLED) { + this.clusterService = BluePrintDependencyService.clusterService() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index ed124d1b2..b611060f7 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList open class MessagePrioritizationConsumer( private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer( val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() log.info("Consuming prioritization topics($topics)") topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 431e02f30..4e4e2da7a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils @@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier { + ProcessorSupplier { return ProcessorSupplier { // Dynamically resolve the Prioritization Processor val processorInstance = BluePrintDependencyService.instance>(name) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 0ed9598f0..f9e23e826 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -81,6 +81,9 @@ open class MessagePrioritizationConsumerTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + @Autowired + lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer + @Before fun setup() { BluePrintDependencyService.inject(applicationContext) @@ -119,7 +122,8 @@ open class MessagePrioritizationConsumerTest { val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) // Test Topology - val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val kafkaStreamConsumerFunction = + spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) val messageConsumerProperties = bluePrintMessageLibPropertyService .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) @@ -135,7 +139,6 @@ open class MessagePrioritizationConsumerTest { // @Test fun testMessagePrioritizationConsumer() { runBlocking { - val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) /** Send sample message with every 1 sec */ diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 37d853cfe..3d3d0c6f5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -21,6 +21,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -42,9 +43,34 @@ open class TestDatabaseConfiguration { } } -@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { +/* Sample Prioritization Listener, used during Application startup +@Component +open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { + + private val log = logger(SamplePrioritizationListeners::class) + + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + defaultMessagePrioritizationConsumer + .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + + @PreDestroy + open fun destroy() = runBlocking { + log.info("Shutting down PrioritizationListeners...") + defaultMessagePrioritizationConsumer.shutDown() + } +} + */ +@Service +open class SampleMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + +@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) +open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() { override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { return when (messagePrioritization.group) { "group-typed" -> arrayListOf("type-0", "type-1", "type-2") @@ -54,7 +80,7 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { } @Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) -open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() +open class SampleMessageAggregateProcessor() : MessageAggregateProcessor() @Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class DefaultMessageOutputProcessor : MessageOutputProcessor() +open class SampleMessageOutputProcessor : MessageOutputProcessor() diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt index 0690eb89d..214a14310 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -148,12 +148,18 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { } open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { + val log = logger(ClusterLockImpl::class) lateinit var distributedLock: DistributedLock + override fun name(): String { + return distributedLock.name() + } + override suspend fun lock() { distributedLock = AtomixLibUtils.distributedLock(atomix, name) distributedLock.lock() + log.debug("Cluster lock($name) created..") } override suspend fun tryLock(timeout: Long): Boolean { @@ -163,6 +169,7 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String) override suspend fun unLock() { distributedLock.unlock() + log.debug("Cluster unlock(${name()}) successfully..") } override fun isLocked(): Boolean { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 21fcfc509..f994628a2 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -57,6 +57,7 @@ data class ClusterInfo( data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) interface ClusterLock { + fun name(): String suspend fun lock() suspend fun tryLock(timeout: Long): Boolean suspend fun unLock()