2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 /** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
30 abstract class AbstractMessagePrioritizationService(
31 private val messagePrioritizationStateService: MessagePrioritizationStateService
32 ) : MessagePrioritizationService {
34 private val log = logger(AbstractMessagePrioritizationService::class)
36 lateinit var prioritizationConfiguration: PrioritizationConfiguration
38 override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
39 this.prioritizationConfiguration = prioritizationConfiguration
42 override fun getConfiguration(): PrioritizationConfiguration {
43 return this.prioritizationConfiguration
46 override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
48 log.info("***** received in prioritize processor key(${messagePrioritize.id})")
49 check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
51 /** Get the cluster lock for message group */
52 val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
54 messagePrioritizationStateService.saveMessage(messagePrioritize)
55 handleCorrelationAndNextStep(messagePrioritize)
56 /** Cluster unLock for message group */
57 MessageProcessorUtils.prioritizationUnLock(clusterLock)
58 } catch (e: Exception) {
59 messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
60 log.error(messagePrioritize.error)
61 /** Update the data store */
62 messagePrioritizationStateService.setMessageStateANdError(
63 messagePrioritize.id, MessageState.ERROR.name,
64 messagePrioritize.error!!
69 override suspend fun output(messages: List<MessagePrioritization>) {
70 log.info("$$$$$ received in output processor id(${messages.ids()})")
71 messages.forEach { message ->
72 messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
76 override suspend fun updateExpiredMessages() {
77 check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
79 val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
80 val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
82 val fetchMessages = messagePrioritizationStateService
83 .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
84 val expiredIds = fetchMessages?.ids()
85 if (!expiredIds.isNullOrEmpty()) {
86 messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
88 } catch (e: Exception) {
89 log.error("failed in updating expired messages", e)
91 MessageProcessorUtils.prioritizationUnLock(clusterLock)
95 override suspend fun cleanExpiredMessage() {
96 check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
98 val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
99 val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
101 messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
102 } catch (e: Exception) {
103 log.error("failed in clean expired messages", e)
105 MessageProcessorUtils.prioritizationUnLock(clusterLock)
109 open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
110 /** Check correlation enabled and correlation field has populated */
111 if (!messagePrioritization.correlationId.isNullOrBlank()) {
112 val id = messagePrioritization.id
113 val group = messagePrioritization.group
114 val correlationId = messagePrioritization.correlationId!!
115 val types = getGroupCorrelationTypes(messagePrioritization)
117 "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
118 "correlation types($types), priority(${messagePrioritization.priority}), " +
119 "correlation id($correlationId)"
122 /** Get all previously received messages from database for group and optional types and correlation Id */
123 val waitingCorrelatedStoreMessages = messagePrioritizationStateService
124 .getCorrelatedMessages(
126 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
129 /** If multiple records found, then check correlation */
130 if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
131 /** Check all correlation satisfies */
132 val correlationResults = MessageCorrelationUtils
133 .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
135 if (correlationResults.correlated) {
136 /** Update all messages to Aggregated state */
137 messagePrioritizationStateService.setMessagesState(
138 waitingCorrelatedStoreMessages.ids(),
139 MessageState.PRIORITIZED.name
141 /** Correlation satisfied, Send only correlated messages to aggregate processor */
142 aggregate(waitingCorrelatedStoreMessages)
144 /** Correlation not satisfied */
145 log.trace("correlation not matched : ${correlationResults.message}")
146 // Update the Message state to Wait
147 messagePrioritizationStateService.setMessagesState(
148 waitingCorrelatedStoreMessages.ids(),
149 MessageState.WAIT.name
153 /** received first message of group and correlation Id, update the message with wait state */
154 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
157 /** No Correlation check needed, simply forward to next processor. */
158 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
159 aggregate(arrayListOf(messagePrioritization))
163 open suspend fun aggregate(messages: List<MessagePrioritization>) {
164 log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
165 if (!messages.isNullOrEmpty()) {
167 /** Implement Aggregation logic in overridden class, If necessary,
168 Populate New Message and Update status with Prioritized, Forward the message to next processor */
169 handleAggregation(messages)
170 } catch (e: Exception) {
171 val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
172 if (!messages.isNullOrEmpty()) {
173 messages.forEach { messagePrioritization ->
175 /** Update the data store */
176 messagePrioritizationStateService.setMessageStateANdError(
177 messagePrioritization.id,
178 MessageState.ERROR.name, error
180 } catch (sendException: Exception) {
182 "failed to update/publish error message(${messagePrioritization.id}) : " +
183 "${sendException.message}", e
187 /** Publish to output topic */
194 /** Child will override this implementation , if necessary
195 * Here the place child has to implement custom Sequencing and Aggregation logic.
197 abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
199 /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
200 * otherwise correlation happens with group and correlationId */
201 abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?