* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
-import java.util.UUID
-open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
+abstract class AbstractMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : MessagePrioritizationService {
- private val log = logger(MessagePrioritizeProcessor::class)
+ private val log = logger(AbstractMessagePrioritizationService::class)
- lateinit var expiryCancellable: Cancellable
- lateinit var cleanCancellable: Cancellable
+ var processorContext: ProcessorContext? = null
- override suspend fun processNB(key: ByteArray, value: ByteArray) {
- log.info("***** received in prioritize processor key(${String(key)})")
- val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
+ log.info("***** received in prioritize processor key(${messagePrioritize.id})")
/** Get the cluster lock for message group */
- val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
/** Cluster unLock for message group */
- MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
+ MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
messagePrioritize.id, MessageState.ERROR.name,
messagePrioritize.error!!
)
- /** Publish to Output topic */
- this.processorContext.forward(
- messagePrioritize.id, messagePrioritize,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
}
}
- override fun init(context: ProcessorContext) {
- super.init(context)
- /** set up expiry marking cron */
- initializeExpiryPunctuator()
- /** Set up cleaning records cron */
- initializeCleanPunctuator()
- /** Set up Cluster Service */
- initializeClusterService()
- }
-
- override fun close() {
- log.info(
- "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
- )
- expiryCancellable.cancel()
- cleanCancellable.cancel()
- }
-
- open fun initializeExpiryPunctuator() {
- val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
- expiryPunctuator.processorContext = processorContext
- expiryPunctuator.configuration = prioritizationConfiguration
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(
- Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
- )
- log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
- }
-
- open fun initializeCleanPunctuator() {
- val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
- cleanPunctuator.processorContext = processorContext
- cleanPunctuator.configuration = prioritizationConfiguration
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(
- Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
- )
- log.info(
- "Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
- )
+ override suspend fun output(id: String) {
+ log.info("$$$$$ received in output processor id($id)")
+ val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
+ /** Check for Kafka Processing, If yes, then send to the output topic */
+ if (this.processorContext != null) {
+ processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
}
open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
)
/** Get all previously received messages from database for group and optional types and correlation Id */
- val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
- group,
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
- )
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService
+ .getCorrelatedMessages(
+ group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+ )
/** If multiple records found, then check correlation */
if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
if (correlationResults.correlated) {
/** Correlation satisfied */
- val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
- /** Send only correlated ids to next processor */
- this.processorContext.forward(
- UUID.randomUUID().toString(), correlatedIds,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
+ /** Send only correlated ids to aggregate processor */
+ aggregate(correlatedIds)
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
} else {
// No Correlation check needed, simply forward to next processor.
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- this.processorContext.forward(
- messagePrioritization.id, messagePrioritization.id,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ aggregate(messagePrioritization.id)
+ }
+ }
+
+ open suspend fun aggregate(strIds: String) {
+ log.info("@@@@@ received in aggregation processor ids($strIds)")
+ val ids = strIds.split(",").map { it.trim() }
+ if (!ids.isNullOrEmpty()) {
+ try {
+ if (ids.size == 1) {
+ /** No aggregation or sequencing needed, simpley forward to next processor */
+ output(ids.first())
+ } else {
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(ids)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+ }
+ } catch (e: Exception) {
+ val error = "failed in Aggregate message($ids) : ${e.message}"
+ log.error(error, e)
+ val storeMessages = messagePrioritizationStateService.getMessages(ids)
+ if (!storeMessages.isNullOrEmpty()) {
+ storeMessages.forEach { messagePrioritization ->
+ try {
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritization.id,
+ MessageState.ERROR.name, error
+ )
+ /** Publish to output topic */
+ output(messagePrioritization.id)
+ } catch (sendException: Exception) {
+ log.error(
+ "failed to update/publish error message(${messagePrioritization.id}) : " +
+ "${sendException.message}", e
+ )
+ }
+ }
+ }
+ }
}
}
+ /** Child will override this implementation , if necessary
+ * Here the place child has to implement custom Sequencing and Aggregation logic.
+ * */
+ abstract suspend fun handleAggregation(messageIds: List<String>)
+
/** If consumer wants specific correlation with respect to group and types, then populate the specific types,
* otherwise correlation happens with group and correlationId */
- open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return null
- }
+ abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
}