package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
private val log = logger(AbstractMessagePrioritizationService::class)
- var processorContext: ProcessorContext? = null
+ lateinit var prioritizationConfiguration: PrioritizationConfiguration
- override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
- this.processorContext = processorContext
+ override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
+ this.prioritizationConfiguration = prioritizationConfiguration
+ }
+
+ override fun getConfiguration(): PrioritizationConfiguration {
+ return this.prioritizationConfiguration
}
override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
log.info("***** received in prioritize processor key(${messagePrioritize.id})")
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
/** Get the cluster lock for message group */
val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
override suspend fun output(messages: List<MessagePrioritization>) {
log.info("$$$$$ received in output processor id(${messages.ids()})")
messages.forEach { message ->
- val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
- /** Check for Kafka Processing, If yes, then send to the output topic */
- if (this.processorContext != null) {
- processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
- }
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
}
}
- override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+ override suspend fun updateExpiredMessages() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
try {
val fetchMessages = messagePrioritizationStateService
.getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
val expiredIds = fetchMessages?.ids()
- if (expiredIds != null && expiredIds.isNotEmpty()) {
+ if (!expiredIds.isNullOrEmpty()) {
messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- if (processorContext != null) {
- fetchMessages.forEach { expired ->
- expired.state = MessageState.EXPIRED.name
- processorContext!!.forward(
- expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
}
} catch (e: Exception) {
log.error("failed in updating expired messages", e)
}
}
- override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+ override suspend fun cleanExpiredMessage() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
try {
messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)