Prioritization expiry and clean scheduler service
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / kafka / DefaultMessagePrioritizeProcessor.kt
index c14a404..624a69f 100644 (file)
@@ -16,9 +16,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
-import org.apache.kafka.streams.processor.Cancellable
 import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
 import org.apache.kafka.streams.processor.To
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
@@ -28,7 +26,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
 
 open class DefaultMessagePrioritizeProcessor(
     private val messagePrioritizationStateService: MessagePrioritizationStateService,
@@ -37,15 +34,11 @@ open class DefaultMessagePrioritizeProcessor(
 
     private val log = logger(DefaultMessagePrioritizeProcessor::class)
 
-    lateinit var expiryCancellable: Cancellable
-    lateinit var cleanCancellable: Cancellable
-
     override suspend fun processNB(key: ByteArray, value: ByteArray) {
 
         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
             ?: throw BluePrintProcessorException("failed to convert")
         try {
-            messagePrioritizationService.setKafkaProcessorContext(processorContext)
             messagePrioritizationService.prioritize(messagePrioritize)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
@@ -65,10 +58,8 @@ open class DefaultMessagePrioritizeProcessor(
 
     override fun init(context: ProcessorContext) {
         super.init(context)
-        /** set up expiry marking cron */
-        initializeExpiryPunctuator()
-        /** Set up cleaning records cron */
-        initializeCleanPunctuator()
+        /** Set Configuration and Processor Context to messagePrioritizationService */
+        messagePrioritizationService.setKafkaProcessorContext(processorContext)
     }
 
     override fun close() {
@@ -76,40 +67,5 @@ open class DefaultMessagePrioritizeProcessor(
             "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
                 "taskId(${processorContext.taskId()})"
         )
-        expiryCancellable.cancel()
-        cleanCancellable.cancel()
-    }
-
-    open fun initializeExpiryPunctuator() {
-        val expiryPunctuator =
-            MessagePriorityExpiryPunctuator(
-                messagePrioritizationStateService
-            )
-        expiryPunctuator.processorContext = processorContext
-        expiryPunctuator.configuration = prioritizationConfiguration
-        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
-        expiryCancellable = processorContext.schedule(
-            Duration.ofMillis(expiryConfiguration.frequencyMilli),
-            PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
-        )
-        log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
-    }
-
-    open fun initializeCleanPunctuator() {
-        val cleanPunctuator =
-            MessagePriorityCleanPunctuator(
-                messagePrioritizationStateService
-            )
-        cleanPunctuator.processorContext = processorContext
-        cleanPunctuator.configuration = prioritizationConfiguration
-        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
-        cleanCancellable = processorContext.schedule(
-            Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
-            PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
-        )
-        log.info(
-            "Clean punctuator setup complete with expiry " +
-                "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
-        )
     }
 }