import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
import org.onap.ccsdk.cds.controllerblueprints.core.logger
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
/** Cluster unLock for message group */
- MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
}
}
- override suspend fun output(id: String) {
- log.info("$$$$$ received in output processor id($id)")
- val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
- /** Check for Kafka Processing, If yes, then send to the output topic */
- if (this.processorContext != null) {
- processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ messages.forEach { message ->
+ val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+ /** Check for Kafka Processing, If yes, then send to the output topic */
+ if (this.processorContext != null) {
+ processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
+ }
+ }
+
+ override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (expiredIds != null && expiredIds.isNotEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ if (processorContext != null) {
+ fetchMessages.forEach { expired ->
+ expired.state = MessageState.EXPIRED.name
+ processorContext!!.forward(
+ expired.id, expired,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+
+ override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+ val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
+ try {
+ messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
+ } catch (e: Exception) {
+ log.error("failed in clean expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
}
}
val correlationId = messagePrioritization.correlationId!!
val types = getGroupCorrelationTypes(messagePrioritization)
log.info(
- "checking correlation for message($id), group($group), types($types), " +
+ "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
+ "correlation types($types), priority(${messagePrioritization.priority}), " +
"correlation id($correlationId)"
)
.correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
if (correlationResults.correlated) {
- /** Correlation satisfied */
- val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
- /** Send only correlated ids to aggregate processor */
- aggregate(correlatedIds)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(
+ waitingCorrelatedStoreMessages.ids(),
+ MessageState.PRIORITIZED.name
+ )
+ /** Correlation satisfied, Send only correlated messages to aggregate processor */
+ aggregate(waitingCorrelatedStoreMessages)
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
- val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
// Update the Message state to Wait
- messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
+ messagePrioritizationStateService.setMessagesState(
+ waitingCorrelatedStoreMessages.ids(),
+ MessageState.WAIT.name
+ )
}
} else {
/** received first message of group and correlation Id, update the message with wait state */
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
}
} else {
- // No Correlation check needed, simply forward to next processor.
+ /** No Correlation check needed, simply forward to next processor. */
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- aggregate(messagePrioritization.id)
+ aggregate(arrayListOf(messagePrioritization))
}
}
- open suspend fun aggregate(strIds: String) {
- log.info("@@@@@ received in aggregation processor ids($strIds)")
- val ids = strIds.split(",").map { it.trim() }
- if (!ids.isNullOrEmpty()) {
+ open suspend fun aggregate(messages: List<MessagePrioritization>) {
+ log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
+ if (!messages.isNullOrEmpty()) {
try {
- if (ids.size == 1) {
- /** No aggregation or sequencing needed, simpley forward to next processor */
- output(ids.first())
- } else {
- /** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
- handleAggregation(ids)
- /** Update all messages to Aggregated state */
- messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
- }
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(messages)
} catch (e: Exception) {
- val error = "failed in Aggregate message($ids) : ${e.message}"
- log.error(error, e)
- val storeMessages = messagePrioritizationStateService.getMessages(ids)
- if (!storeMessages.isNullOrEmpty()) {
- storeMessages.forEach { messagePrioritization ->
+ val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
+ if (!messages.isNullOrEmpty()) {
+ messages.forEach { messagePrioritization ->
try {
/** Update the data store */
messagePrioritizationStateService.setMessageStateANdError(
messagePrioritization.id,
MessageState.ERROR.name, error
)
- /** Publish to output topic */
- output(messagePrioritization.id)
} catch (sendException: Exception) {
log.error(
"failed to update/publish error message(${messagePrioritization.id}) : " +
)
}
}
+ /** Publish to output topic */
+ output(messages)
}
}
}
/** Child will override this implementation , if necessary
* Here the place child has to implement custom Sequencing and Aggregation logic.
* */
- abstract suspend fun handleAggregation(messageIds: List<String>)
+ abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
/** If consumer wants specific correlation with respect to group and types, then populate the specific types,
* otherwise correlation happens with group and correlationId */