import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
- : AbstractBluePrintMessagePunctuator() {
+class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractBluePrintMessagePunctuator() {
private val log = logger(MessagePriorityExpiryPunctuator::class)
lateinit var configuration: PrioritizationConfiguration
override suspend fun punctuateNB(timestamp: Long) {
- log.info("**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})")
+ log.info(
+ "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
val expiryConfiguration = configuration.expiryConfiguration
val fetchMessages = messagePrioritizationStateService
- .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
val expiredIds = fetchMessages?.map { it.id }
if (expiredIds != null && expiredIds.isNotEmpty()) {
messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
fetchMessages.forEach { expired ->
- processorContext.forward(expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_EXPIRED))
+ processorContext.forward(
+ expired.id, expired,
+ To.child(MessagePrioritizationConstants.SINK_EXPIRED)
+ )
}
}
}
}
-class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
- : AbstractBluePrintMessagePunctuator() {
+class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractBluePrintMessagePunctuator() {
private val log = logger(MessagePriorityCleanPunctuator::class)
lateinit var configuration: PrioritizationConfiguration
override suspend fun punctuateNB(timestamp: Long) {
- log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})")
- //TODO
+ log.info(
+ "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
+ // TODO
}
-}
\ No newline at end of file
+}