Prioritization expiry and clean scheduler service
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / AbstractMessagePrioritizationService.kt
index 13c0dd7..9314032 100644 (file)
@@ -18,11 +18,14 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
 
 import org.apache.kafka.streams.processor.ProcessorContext
 import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -49,7 +52,7 @@ abstract class AbstractMessagePrioritizationService(
             messagePrioritizationStateService.saveMessage(messagePrioritize)
             handleCorrelationAndNextStep(messagePrioritize)
             /** Cluster unLock for message group */
-            MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
             log.error(messagePrioritize.error)
@@ -61,12 +64,50 @@ abstract class AbstractMessagePrioritizationService(
         }
     }
 
-    override suspend fun output(id: String) {
-        log.info("$$$$$ received in output processor id($id)")
-        val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
-        /** Check for Kafka Processing, If yes, then send to the output topic */
-        if (this.processorContext != null) {
-            processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+    override suspend fun output(messages: List<MessagePrioritization>) {
+        log.info("$$$$$ received in output processor id(${messages.ids()})")
+        messages.forEach { message ->
+            val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+            /** Check for Kafka Processing, If yes, then send to the output topic */
+            if (this.processorContext != null) {
+                processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+            }
+        }
+    }
+
+    override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+        val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+        try {
+            val fetchMessages = messagePrioritizationStateService
+                .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+            val expiredIds = fetchMessages?.ids()
+            if (expiredIds != null && expiredIds.isNotEmpty()) {
+                messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+                if (processorContext != null) {
+                    fetchMessages.forEach { expired ->
+                        expired.state = MessageState.EXPIRED.name
+                        processorContext!!.forward(
+                            expired.id, expired,
+                            To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+                        )
+                    }
+                }
+            }
+        } catch (e: Exception) {
+            log.error("failed in updating expired messages", e)
+        } finally {
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
+        }
+    }
+
+    override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+        val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
+        try {
+            messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
+        } catch (e: Exception) {
+            log.error("failed in clean expired messages", e)
+        } finally {
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
         }
     }
 
@@ -78,7 +119,8 @@ abstract class AbstractMessagePrioritizationService(
             val correlationId = messagePrioritization.correlationId!!
             val types = getGroupCorrelationTypes(messagePrioritization)
             log.info(
-                "checking correlation for message($id), group($group), types($types), " +
+                "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
+                    "correlation types($types), priority(${messagePrioritization.priority}), " +
                     "correlation id($correlationId)"
             )
 
@@ -96,57 +138,50 @@ abstract class AbstractMessagePrioritizationService(
                     .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
 
                 if (correlationResults.correlated) {
-                    /** Correlation  satisfied */
-                    val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
-                    /**  Send only correlated ids to aggregate processor */
-                    aggregate(correlatedIds)
+                    /** Update all messages to Aggregated state */
+                    messagePrioritizationStateService.setMessagesState(
+                        waitingCorrelatedStoreMessages.ids(),
+                        MessageState.PRIORITIZED.name
+                    )
+                    /** Correlation  satisfied, Send only correlated messages to aggregate processor */
+                    aggregate(waitingCorrelatedStoreMessages)
                 } else {
                     /** Correlation not satisfied */
                     log.trace("correlation not matched : ${correlationResults.message}")
-                    val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
                     // Update the Message state to Wait
-                    messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
+                    messagePrioritizationStateService.setMessagesState(
+                        waitingCorrelatedStoreMessages.ids(),
+                        MessageState.WAIT.name
+                    )
                 }
             } else {
                 /** received first message of group and correlation Id, update the message with wait state */
                 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
             }
         } else {
-            // No Correlation check needed, simply forward to next processor.
+            /** No Correlation check needed, simply forward to next processor. */
             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
-            aggregate(messagePrioritization.id)
+            aggregate(arrayListOf(messagePrioritization))
         }
     }
 
-    open suspend fun aggregate(strIds: String) {
-        log.info("@@@@@ received in aggregation processor ids($strIds)")
-        val ids = strIds.split(",").map { it.trim() }
-        if (!ids.isNullOrEmpty()) {
+    open suspend fun aggregate(messages: List<MessagePrioritization>) {
+        log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
+        if (!messages.isNullOrEmpty()) {
             try {
-                if (ids.size == 1) {
-                    /** No aggregation or sequencing needed, simpley forward to next processor */
-                    output(ids.first())
-                } else {
-                    /** Implement Aggregation logic in overridden class, If necessary,
-                    Populate New Message and Update status with Prioritized, Forward the message to next processor */
-                    handleAggregation(ids)
-                    /** Update all messages to Aggregated state */
-                    messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
-                }
+                /** Implement Aggregation logic in overridden class, If necessary,
+                Populate New Message and Update status with Prioritized, Forward the message to next processor */
+                handleAggregation(messages)
             } catch (e: Exception) {
-                val error = "failed in Aggregate message($ids) : ${e.message}"
-                log.error(error, e)
-                val storeMessages = messagePrioritizationStateService.getMessages(ids)
-                if (!storeMessages.isNullOrEmpty()) {
-                    storeMessages.forEach { messagePrioritization ->
+                val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
+                if (!messages.isNullOrEmpty()) {
+                    messages.forEach { messagePrioritization ->
                         try {
                             /** Update the data store */
                             messagePrioritizationStateService.setMessageStateANdError(
                                 messagePrioritization.id,
                                 MessageState.ERROR.name, error
                             )
-                            /** Publish to output topic */
-                            output(messagePrioritization.id)
                         } catch (sendException: Exception) {
                             log.error(
                                 "failed to update/publish error message(${messagePrioritization.id}) : " +
@@ -154,6 +189,8 @@ abstract class AbstractMessagePrioritizationService(
                             )
                         }
                     }
+                    /** Publish to output topic */
+                    output(messages)
                 }
             }
         }
@@ -162,7 +199,7 @@ abstract class AbstractMessagePrioritizationService(
     /** Child will override this implementation , if necessary
      *  Here the place child has to implement custom Sequencing and Aggregation logic.
      * */
-    abstract suspend fun handleAggregation(messageIds: List<String>)
+    abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
 
     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
      * otherwise correlation happens with group and correlationId */