Rest endpoint for message Prioritization
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / AbstractMessagePrioritizationService.kt
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
 
-import org.apache.kafka.streams.processor.Cancellable
 import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
 import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
-import java.util.UUID
 
-open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
+abstract class AbstractMessagePrioritizationService(
+    private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : MessagePrioritizationService {
 
-    private val log = logger(MessagePrioritizeProcessor::class)
+    private val log = logger(AbstractMessagePrioritizationService::class)
 
-    lateinit var expiryCancellable: Cancellable
-    lateinit var cleanCancellable: Cancellable
+    var processorContext: ProcessorContext? = null
 
-    override suspend fun processNB(key: ByteArray, value: ByteArray) {
-        log.info("***** received in prioritize processor key(${String(key)})")
-        val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
-            ?: throw BluePrintProcessorException("failed to convert")
+    override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
+        this.processorContext = processorContext
+    }
+
+    override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
         try {
+            log.info("***** received in prioritize processor key(${messagePrioritize.id})")
             /** Get the cluster lock for message group */
-            val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
+            val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
             // Save the Message
             messagePrioritizationStateService.saveMessage(messagePrioritize)
             handleCorrelationAndNextStep(messagePrioritize)
             /** Cluster unLock for message group */
-            MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
+            MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
             log.error(messagePrioritize.error)
@@ -59,58 +58,16 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
                 messagePrioritize.id, MessageState.ERROR.name,
                 messagePrioritize.error!!
             )
-            /** Publish to Output topic */
-            this.processorContext.forward(
-                messagePrioritize.id, messagePrioritize,
-                To.child(MessagePrioritizationConstants.SINK_OUTPUT)
-            )
         }
     }
 
-    override fun init(context: ProcessorContext) {
-        super.init(context)
-        /** set up expiry marking cron */
-        initializeExpiryPunctuator()
-        /** Set up cleaning records cron */
-        initializeCleanPunctuator()
-        /** Set up Cluster Service */
-        initializeClusterService()
-    }
-
-    override fun close() {
-        log.info(
-            "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
-                "taskId(${processorContext.taskId()})"
-        )
-        expiryCancellable.cancel()
-        cleanCancellable.cancel()
-    }
-
-    open fun initializeExpiryPunctuator() {
-        val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
-        expiryPunctuator.processorContext = processorContext
-        expiryPunctuator.configuration = prioritizationConfiguration
-        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
-        expiryCancellable = processorContext.schedule(
-            Duration.ofMillis(expiryConfiguration.frequencyMilli),
-            PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
-        )
-        log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
-    }
-
-    open fun initializeCleanPunctuator() {
-        val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
-        cleanPunctuator.processorContext = processorContext
-        cleanPunctuator.configuration = prioritizationConfiguration
-        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
-        cleanCancellable = processorContext.schedule(
-            Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
-            PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
-        )
-        log.info(
-            "Clean punctuator setup complete with expiry " +
-                "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
-        )
+    override suspend fun output(id: String) {
+        log.info("$$$$$ received in output processor id($id)")
+        val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
+        /** Check for Kafka Processing, If yes, then send to the output topic */
+        if (this.processorContext != null) {
+            processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+        }
     }
 
     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
@@ -126,10 +83,11 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
             )
 
             /** Get all previously received messages from database for group and optional types and correlation Id */
-            val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
-                group,
-                arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
-            )
+            val waitingCorrelatedStoreMessages = messagePrioritizationStateService
+                .getCorrelatedMessages(
+                    group,
+                    arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+                )
 
             /** If multiple records found, then check correlation */
             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
@@ -139,12 +97,9 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
 
                 if (correlationResults.correlated) {
                     /** Correlation  satisfied */
-                    val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
-                    /**  Send only correlated ids to next processor */
-                    this.processorContext.forward(
-                        UUID.randomUUID().toString(), correlatedIds,
-                        To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-                    )
+                    val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
+                    /**  Send only correlated ids to aggregate processor */
+                    aggregate(correlatedIds)
                 } else {
                     /** Correlation not satisfied */
                     log.trace("correlation not matched : ${correlationResults.message}")
@@ -159,16 +114,57 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         } else {
             // No Correlation check needed, simply forward to next processor.
             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
-            this.processorContext.forward(
-                messagePrioritization.id, messagePrioritization.id,
-                To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-            )
+            aggregate(messagePrioritization.id)
+        }
+    }
+
+    open suspend fun aggregate(strIds: String) {
+        log.info("@@@@@ received in aggregation processor ids($strIds)")
+        val ids = strIds.split(",").map { it.trim() }
+        if (!ids.isNullOrEmpty()) {
+            try {
+                if (ids.size == 1) {
+                    /** No aggregation or sequencing needed, simpley forward to next processor */
+                    output(ids.first())
+                } else {
+                    /** Implement Aggregation logic in overridden class, If necessary,
+                    Populate New Message and Update status with Prioritized, Forward the message to next processor */
+                    handleAggregation(ids)
+                    /** Update all messages to Aggregated state */
+                    messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+                }
+            } catch (e: Exception) {
+                val error = "failed in Aggregate message($ids) : ${e.message}"
+                log.error(error, e)
+                val storeMessages = messagePrioritizationStateService.getMessages(ids)
+                if (!storeMessages.isNullOrEmpty()) {
+                    storeMessages.forEach { messagePrioritization ->
+                        try {
+                            /** Update the data store */
+                            messagePrioritizationStateService.setMessageStateANdError(
+                                messagePrioritization.id,
+                                MessageState.ERROR.name, error
+                            )
+                            /** Publish to output topic */
+                            output(messagePrioritization.id)
+                        } catch (sendException: Exception) {
+                            log.error(
+                                "failed to update/publish error message(${messagePrioritization.id}) : " +
+                                    "${sendException.message}", e
+                            )
+                        }
+                    }
+                }
+            }
         }
     }
 
+    /** Child will override this implementation , if necessary
+     *  Here the place child has to implement custom Sequencing and Aggregation logic.
+     * */
+    abstract suspend fun handleAggregation(messageIds: List<String>)
+
     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
      * otherwise correlation happens with group and correlationId */
-    open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
-        return null
-    }
+    abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
 }