Implementation to avoid concurrent procession of message group while prioritization.
Sample message prioritization Kafka listener properties.
Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Ifbf39985b03c662b6ccf7740be711cfeb7bfbebb
blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
+
+# Message prioritization kakfa properties, Enable if Prioritization service is needed
+# Deploy message-prioritization function along with blueprintsprocessor application.
+#blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth
+#blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092
+#blueprintsprocessor.messageconsumer.prioritize-input.applicationId=cds-controller
+#blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic
\ No newline at end of file
----------------
kafka-topics --list --bootstrap-server localhost:9092
+To publish message
+--------------------
+kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic
To Listen for Output
----------------------
<description>Blueprints Processor Function - Message Prioritization</description>
<dependencies>
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>atomix-lib</artifactId>
+ </dependency>
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>message-lib</artifactId>
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
lateinit var prioritizationConfiguration: PrioritizationConfiguration
lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+ var clusterService: BluePrintClusterService? = null
override fun init(context: ProcessorContext) {
this.processorContext = context
this.messagePrioritizationStateService = BluePrintDependencyService
.messagePrioritizationStateService()
}
+
+ /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
+ open fun initializeClusterService() {
+ /** Get the Cluster service to update in store */
+ if (BluePrintConstants.CLUSTER_ENABLED) {
+ this.clusterService = BluePrintDependencyService.clusterService()
+ }
+ }
}
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
open class MessagePrioritizationConsumer(
private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
as KafkaStreamsBasicAuthConsumerProperties
- val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
log.info("Consuming prioritization topics($topics)")
topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
+ /** Get the cluster lock for message group */
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
+ /** Cluster unLock for message group */
+ MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
initializeExpiryPunctuator()
/** Set up cleaning records cron */
initializeCleanPunctuator()
+ /** Set up Cluster Service */
+ initializeClusterService()
}
override fun close() {
log.info(
"closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
+ "taskId(${processorContext.taskId()})"
)
expiryCancellable.cancel()
cleanCancellable.cancel()
)
log.info(
"Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
)
}
val types = getGroupCorrelationTypes(messagePrioritization)
log.info(
"checking correlation for message($id), group($group), types($types), " +
- "correlation id($correlationId)"
+ "correlation id($correlationId)"
)
/** Get all previously received messages from database for group and optional types and correlation Id */
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
object MessageProcessorUtils {
+ /** Utility to create the cluster lock for message [messagePrioritization] */
+ suspend fun prioritizationGrouplock(
+ clusterService: BluePrintClusterService?,
+ messagePrioritization: MessagePrioritization
+ ): ClusterLock? {
+ return if (clusterService != null && clusterService.clusterJoined()) {
+ val lockName = "prioritization-${messagePrioritization.group}"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
+ /** Utility used to cluster unlock for message [messagePrioritization] */
+ suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
+ if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+ clusterLock.unLock()
+ clusterLock.close()
+ }
+ }
+
fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
- ProcessorSupplier<K, V> {
+ ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+ @Autowired
+ lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+
@Before
fun setup() {
BluePrintDependencyService.inject(applicationContext)
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
// Test Topology
- val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+ val kafkaStreamConsumerFunction =
+ spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
val messageConsumerProperties = bluePrintMessageLibPropertyService
.messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
// @Test
fun testMessagePrioritizationConsumer() {
runBlocking {
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
/** Send sample message with every 1 sec */
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
}
}
-@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
+/* Sample Prioritization Listener, used during Application startup
+@Component
+open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) {
+
+ private val log = logger(SamplePrioritizationListeners::class)
+
+ @EventListener(ApplicationReadyEvent::class)
+ open fun init() = runBlocking {
+ log.info("Starting PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer
+ .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+ }
+
+ @PreDestroy
+ open fun destroy() = runBlocking {
+ log.info("Shutting down PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer.shutDown()
+ }
+}
+ */
+@Service
+open class SampleMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+
+@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return when (messagePrioritization.group) {
"group-typed" -> arrayListOf("type-0", "type-1", "type-2")
}
@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor()
+open class SampleMessageAggregateProcessor() : MessageAggregateProcessor()
@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class DefaultMessageOutputProcessor : MessageOutputProcessor()
+open class SampleMessageOutputProcessor : MessageOutputProcessor()
}
open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
+ val log = logger(ClusterLockImpl::class)
lateinit var distributedLock: DistributedLock
+ override fun name(): String {
+ return distributedLock.name()
+ }
+
override suspend fun lock() {
distributedLock = AtomixLibUtils.distributedLock(atomix, name)
distributedLock.lock()
+ log.debug("Cluster lock($name) created..")
}
override suspend fun tryLock(timeout: Long): Boolean {
override suspend fun unLock() {
distributedLock.unlock()
+ log.debug("Cluster unlock(${name()}) successfully..")
}
override fun isLocked(): Boolean {
data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null)
interface ClusterLock {
+ fun name(): String
suspend fun lock()
suspend fun tryLock(timeout: Long): Boolean
suspend fun unLock()