Prioritization Optional NATS consumer support
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / AbstractMessagePrioritizationService.kt
index 9314032..a6963d8 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
 
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
@@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService(
 
     private val log = logger(AbstractMessagePrioritizationService::class)
 
-    var processorContext: ProcessorContext? = null
+    lateinit var prioritizationConfiguration: PrioritizationConfiguration
 
-    override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
-        this.processorContext = processorContext
+    override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
+        this.prioritizationConfiguration = prioritizationConfiguration
+    }
+
+    override fun getConfiguration(): PrioritizationConfiguration {
+        return this.prioritizationConfiguration
     }
 
     override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
         try {
             log.info("***** received in prioritize processor key(${messagePrioritize.id})")
+            check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
             /** Get the cluster lock for message group */
             val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
             // Save the Message
@@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService(
     override suspend fun output(messages: List<MessagePrioritization>) {
         log.info("$$$$$ received in output processor id(${messages.ids()})")
         messages.forEach { message ->
-            val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
-            /** Check for Kafka Processing, If yes, then send to the output topic */
-            if (this.processorContext != null) {
-                processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
-            }
+            messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
         }
     }
 
-    override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+    override suspend fun updateExpiredMessages() {
+        check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
         val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
         try {
             val fetchMessages = messagePrioritizationStateService
                 .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
             val expiredIds = fetchMessages?.ids()
-            if (expiredIds != null && expiredIds.isNotEmpty()) {
+            if (!expiredIds.isNullOrEmpty()) {
                 messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
-                if (processorContext != null) {
-                    fetchMessages.forEach { expired ->
-                        expired.state = MessageState.EXPIRED.name
-                        processorContext!!.forward(
-                            expired.id, expired,
-                            To.child(MessagePrioritizationConstants.SINK_OUTPUT)
-                        )
-                    }
-                }
             }
         } catch (e: Exception) {
             log.error("failed in updating expired messages", e)
@@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService(
         }
     }
 
-    override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+    override suspend fun cleanExpiredMessage() {
+        check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
         val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
         try {
             messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)