package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.To
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
open class DefaultMessagePrioritizeProcessor(
private val messagePrioritizationStateService: MessagePrioritizationStateService,
private val log = logger(DefaultMessagePrioritizeProcessor::class)
- lateinit var expiryCancellable: Cancellable
- lateinit var cleanCancellable: Cancellable
-
override suspend fun processNB(key: ByteArray, value: ByteArray) {
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
- messagePrioritizationService.setKafkaProcessorContext(processorContext)
messagePrioritizationService.prioritize(messagePrioritize)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
override fun init(context: ProcessorContext) {
super.init(context)
- /** set up expiry marking cron */
- initializeExpiryPunctuator()
- /** Set up cleaning records cron */
- initializeCleanPunctuator()
+ /** Set Configuration and Processor Context to messagePrioritizationService */
+ messagePrioritizationService.setKafkaProcessorContext(processorContext)
}
override fun close() {
"closing prioritization processor applicationId(${processorContext.applicationId()}), " +
"taskId(${processorContext.taskId()})"
)
- expiryCancellable.cancel()
- cleanCancellable.cancel()
- }
-
- open fun initializeExpiryPunctuator() {
- val expiryPunctuator =
- MessagePriorityExpiryPunctuator(
- messagePrioritizationStateService
- )
- expiryPunctuator.processorContext = processorContext
- expiryPunctuator.configuration = prioritizationConfiguration
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(
- Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
- )
- log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
- }
-
- open fun initializeCleanPunctuator() {
- val cleanPunctuator =
- MessagePriorityCleanPunctuator(
- messagePrioritizationStateService
- )
- cleanPunctuator.processorContext = processorContext
- cleanPunctuator.configuration = prioritizationConfiguration
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(
- Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
- )
- log.info(
- "Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
- )
}
}