2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import org.apache.kafka.streams.processor.ProcessorContext
20 import org.apache.kafka.streams.processor.To
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.logger
33 /** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
34 abstract class AbstractMessagePrioritizationService(
35 private val messagePrioritizationStateService: MessagePrioritizationStateService
36 ) : MessagePrioritizationService {
38 private val log = logger(AbstractMessagePrioritizationService::class)
40 var processorContext: ProcessorContext? = null
42 override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
43 this.processorContext = processorContext
46 override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
48 log.info("***** received in prioritize processor key(${messagePrioritize.id})")
49 /** Get the cluster lock for message group */
50 val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
52 messagePrioritizationStateService.saveMessage(messagePrioritize)
53 handleCorrelationAndNextStep(messagePrioritize)
54 /** Cluster unLock for message group */
55 MessageProcessorUtils.prioritizationUnLock(clusterLock)
56 } catch (e: Exception) {
57 messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
58 log.error(messagePrioritize.error)
59 /** Update the data store */
60 messagePrioritizationStateService.setMessageStateANdError(
61 messagePrioritize.id, MessageState.ERROR.name,
62 messagePrioritize.error!!
67 override suspend fun output(messages: List<MessagePrioritization>) {
68 log.info("$$$$$ received in output processor id(${messages.ids()})")
69 messages.forEach { message ->
70 val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
71 /** Check for Kafka Processing, If yes, then send to the output topic */
72 if (this.processorContext != null) {
73 processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
78 override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
79 val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
81 val fetchMessages = messagePrioritizationStateService
82 .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
83 val expiredIds = fetchMessages?.ids()
84 if (expiredIds != null && expiredIds.isNotEmpty()) {
85 messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
86 if (processorContext != null) {
87 fetchMessages.forEach { expired ->
88 expired.state = MessageState.EXPIRED.name
89 processorContext!!.forward(
91 To.child(MessagePrioritizationConstants.SINK_OUTPUT)
96 } catch (e: Exception) {
97 log.error("failed in updating expired messages", e)
99 MessageProcessorUtils.prioritizationUnLock(clusterLock)
103 override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
104 val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
106 messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
107 } catch (e: Exception) {
108 log.error("failed in clean expired messages", e)
110 MessageProcessorUtils.prioritizationUnLock(clusterLock)
114 open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
115 /** Check correlation enabled and correlation field has populated */
116 if (!messagePrioritization.correlationId.isNullOrBlank()) {
117 val id = messagePrioritization.id
118 val group = messagePrioritization.group
119 val correlationId = messagePrioritization.correlationId!!
120 val types = getGroupCorrelationTypes(messagePrioritization)
122 "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
123 "correlation types($types), priority(${messagePrioritization.priority}), " +
124 "correlation id($correlationId)"
127 /** Get all previously received messages from database for group and optional types and correlation Id */
128 val waitingCorrelatedStoreMessages = messagePrioritizationStateService
129 .getCorrelatedMessages(
131 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
134 /** If multiple records found, then check correlation */
135 if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
136 /** Check all correlation satisfies */
137 val correlationResults = MessageCorrelationUtils
138 .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
140 if (correlationResults.correlated) {
141 /** Update all messages to Aggregated state */
142 messagePrioritizationStateService.setMessagesState(
143 waitingCorrelatedStoreMessages.ids(),
144 MessageState.PRIORITIZED.name
146 /** Correlation satisfied, Send only correlated messages to aggregate processor */
147 aggregate(waitingCorrelatedStoreMessages)
149 /** Correlation not satisfied */
150 log.trace("correlation not matched : ${correlationResults.message}")
151 // Update the Message state to Wait
152 messagePrioritizationStateService.setMessagesState(
153 waitingCorrelatedStoreMessages.ids(),
154 MessageState.WAIT.name
158 /** received first message of group and correlation Id, update the message with wait state */
159 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
162 /** No Correlation check needed, simply forward to next processor. */
163 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
164 aggregate(arrayListOf(messagePrioritization))
168 open suspend fun aggregate(messages: List<MessagePrioritization>) {
169 log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
170 if (!messages.isNullOrEmpty()) {
172 /** Implement Aggregation logic in overridden class, If necessary,
173 Populate New Message and Update status with Prioritized, Forward the message to next processor */
174 handleAggregation(messages)
175 } catch (e: Exception) {
176 val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
177 if (!messages.isNullOrEmpty()) {
178 messages.forEach { messagePrioritization ->
180 /** Update the data store */
181 messagePrioritizationStateService.setMessageStateANdError(
182 messagePrioritization.id,
183 MessageState.ERROR.name, error
185 } catch (sendException: Exception) {
187 "failed to update/publish error message(${messagePrioritization.id}) : " +
188 "${sendException.message}", e
192 /** Publish to output topic */
199 /** Child will override this implementation , if necessary
200 * Here the place child has to implement custom Sequencing and Aggregation logic.
202 abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
204 /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
205 * otherwise correlation happens with group and correlationId */
206 abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?