import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import java.time.Duration
-import java.util.*
-
+import java.util.UUID
open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
override suspend fun processNB(key: ByteArray, value: ByteArray) {
log.info("***** received in prioritize processor key(${String(key)})")
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ ?: throw BluePrintProcessorException("failed to convert")
try {
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
/** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(messagePrioritize.id, MessageState.ERROR.name,
- messagePrioritize.error!!)
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!
+ )
/** Publish to Output topic */
- this.processorContext.forward(messagePrioritize.id, messagePrioritize,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ this.processorContext.forward(
+ messagePrioritize.id, messagePrioritize,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
}
}
}
override fun close() {
- log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})")
+ log.info(
+ "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
expiryCancellable.cancel()
cleanCancellable.cancel()
}
expiryPunctuator.processorContext = processorContext
expiryPunctuator.configuration = prioritizationConfiguration
val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator)
+ expiryCancellable = processorContext.schedule(
+ Duration.ofMillis(expiryConfiguration.frequencyMilli),
+ PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
+ )
log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
}
cleanPunctuator.processorContext = processorContext
cleanPunctuator.configuration = prioritizationConfiguration
val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator)
- log.info("Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days")
+ cleanCancellable = processorContext.schedule(
+ Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+ PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
+ )
+ log.info(
+ "Clean punctuator setup complete with expiry " +
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ )
}
open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
val group = messagePrioritization.group
val correlationId = messagePrioritization.correlationId!!
val types = getGroupCorrelationTypes(messagePrioritization)
- log.info("checking correlation for message($id), group($group), types($types), " +
- "correlation id($correlationId)")
+ log.info(
+ "checking correlation for message($id), group($group), types($types), " +
+ "correlation id($correlationId)"
+ )
/** Get all previously received messages from database for group and optional types and correlation Id */
- val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group,
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId)
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
+ group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+ )
/** If multiple records found, then check correlation */
if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
/** Check all correlation satisfies */
val correlationResults = MessageCorrelationUtils
- .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
+ .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
if (correlationResults.correlated) {
/** Correlation satisfied */
val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
/** Send only correlated ids to next processor */
- this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+ this.processorContext.forward(
+ UUID.randomUUID().toString(), correlatedIds,
+ To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+ )
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
} else {
// No Correlation check needed, simply forward to next processor.
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- this.processorContext.forward(messagePrioritization.id, messagePrioritization.id,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+ this.processorContext.forward(
+ messagePrioritization.id, messagePrioritization.id,
+ To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+ )
}
}
open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return null
}
-}
\ No newline at end of file
+}