2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import org.apache.kafka.streams.processor.ProcessorContext
20 import org.apache.kafka.streams.processor.To
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
30 /** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
31 abstract class AbstractMessagePrioritizationService(
32 private val messagePrioritizationStateService: MessagePrioritizationStateService
33 ) : MessagePrioritizationService {
35 private val log = logger(AbstractMessagePrioritizationService::class)
37 var processorContext: ProcessorContext? = null
39 override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
40 this.processorContext = processorContext
43 override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
45 log.info("***** received in prioritize processor key(${messagePrioritize.id})")
46 /** Get the cluster lock for message group */
47 val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
49 messagePrioritizationStateService.saveMessage(messagePrioritize)
50 handleCorrelationAndNextStep(messagePrioritize)
51 /** Cluster unLock for message group */
52 MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
53 } catch (e: Exception) {
54 messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
55 log.error(messagePrioritize.error)
56 /** Update the data store */
57 messagePrioritizationStateService.setMessageStateANdError(
58 messagePrioritize.id, MessageState.ERROR.name,
59 messagePrioritize.error!!
64 override suspend fun output(id: String) {
65 log.info("$$$$$ received in output processor id($id)")
66 val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
67 /** Check for Kafka Processing, If yes, then send to the output topic */
68 if (this.processorContext != null) {
69 processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
73 open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
74 /** Check correlation enabled and correlation field has populated */
75 if (!messagePrioritization.correlationId.isNullOrBlank()) {
76 val id = messagePrioritization.id
77 val group = messagePrioritization.group
78 val correlationId = messagePrioritization.correlationId!!
79 val types = getGroupCorrelationTypes(messagePrioritization)
81 "checking correlation for message($id), group($group), types($types), " +
82 "correlation id($correlationId)"
85 /** Get all previously received messages from database for group and optional types and correlation Id */
86 val waitingCorrelatedStoreMessages = messagePrioritizationStateService
87 .getCorrelatedMessages(
89 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
92 /** If multiple records found, then check correlation */
93 if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
94 /** Check all correlation satisfies */
95 val correlationResults = MessageCorrelationUtils
96 .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
98 if (correlationResults.correlated) {
99 /** Correlation satisfied */
100 val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
101 /** Send only correlated ids to aggregate processor */
102 aggregate(correlatedIds)
104 /** Correlation not satisfied */
105 log.trace("correlation not matched : ${correlationResults.message}")
106 val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
107 // Update the Message state to Wait
108 messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
111 /** received first message of group and correlation Id, update the message with wait state */
112 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
115 // No Correlation check needed, simply forward to next processor.
116 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
117 aggregate(messagePrioritization.id)
121 open suspend fun aggregate(strIds: String) {
122 log.info("@@@@@ received in aggregation processor ids($strIds)")
123 val ids = strIds.split(",").map { it.trim() }
124 if (!ids.isNullOrEmpty()) {
127 /** No aggregation or sequencing needed, simpley forward to next processor */
130 /** Implement Aggregation logic in overridden class, If necessary,
131 Populate New Message and Update status with Prioritized, Forward the message to next processor */
132 handleAggregation(ids)
133 /** Update all messages to Aggregated state */
134 messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
136 } catch (e: Exception) {
137 val error = "failed in Aggregate message($ids) : ${e.message}"
139 val storeMessages = messagePrioritizationStateService.getMessages(ids)
140 if (!storeMessages.isNullOrEmpty()) {
141 storeMessages.forEach { messagePrioritization ->
143 /** Update the data store */
144 messagePrioritizationStateService.setMessageStateANdError(
145 messagePrioritization.id,
146 MessageState.ERROR.name, error
148 /** Publish to output topic */
149 output(messagePrioritization.id)
150 } catch (sendException: Exception) {
152 "failed to update/publish error message(${messagePrioritization.id}) : " +
153 "${sendException.message}", e
162 /** Child will override this implementation , if necessary
163 * Here the place child has to implement custom Sequencing and Aggregation logic.
165 abstract suspend fun handleAggregation(messageIds: List<String>)
167 /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
168 * otherwise correlation happens with group and correlationId */
169 abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?