931403200195d25809be988de8e06c1ed3e29743
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import org.apache.kafka.streams.processor.ProcessorContext
20 import org.apache.kafka.streams.processor.To
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.logger
32
33 /** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
34 abstract class AbstractMessagePrioritizationService(
35     private val messagePrioritizationStateService: MessagePrioritizationStateService
36 ) : MessagePrioritizationService {
37
38     private val log = logger(AbstractMessagePrioritizationService::class)
39
40     var processorContext: ProcessorContext? = null
41
42     override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
43         this.processorContext = processorContext
44     }
45
46     override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
47         try {
48             log.info("***** received in prioritize processor key(${messagePrioritize.id})")
49             /** Get the cluster lock for message group */
50             val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
51             // Save the Message
52             messagePrioritizationStateService.saveMessage(messagePrioritize)
53             handleCorrelationAndNextStep(messagePrioritize)
54             /** Cluster unLock for message group */
55             MessageProcessorUtils.prioritizationUnLock(clusterLock)
56         } catch (e: Exception) {
57             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
58             log.error(messagePrioritize.error)
59             /** Update the data store */
60             messagePrioritizationStateService.setMessageStateANdError(
61                 messagePrioritize.id, MessageState.ERROR.name,
62                 messagePrioritize.error!!
63             )
64         }
65     }
66
67     override suspend fun output(messages: List<MessagePrioritization>) {
68         log.info("$$$$$ received in output processor id(${messages.ids()})")
69         messages.forEach { message ->
70             val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
71             /** Check for Kafka Processing, If yes, then send to the output topic */
72             if (this.processorContext != null) {
73                 processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
74             }
75         }
76     }
77
78     override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
79         val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
80         try {
81             val fetchMessages = messagePrioritizationStateService
82                 .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
83             val expiredIds = fetchMessages?.ids()
84             if (expiredIds != null && expiredIds.isNotEmpty()) {
85                 messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
86                 if (processorContext != null) {
87                     fetchMessages.forEach { expired ->
88                         expired.state = MessageState.EXPIRED.name
89                         processorContext!!.forward(
90                             expired.id, expired,
91                             To.child(MessagePrioritizationConstants.SINK_OUTPUT)
92                         )
93                     }
94                 }
95             }
96         } catch (e: Exception) {
97             log.error("failed in updating expired messages", e)
98         } finally {
99             MessageProcessorUtils.prioritizationUnLock(clusterLock)
100         }
101     }
102
103     override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
104         val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
105         try {
106             messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
107         } catch (e: Exception) {
108             log.error("failed in clean expired messages", e)
109         } finally {
110             MessageProcessorUtils.prioritizationUnLock(clusterLock)
111         }
112     }
113
114     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
115         /** Check correlation enabled and correlation field has populated */
116         if (!messagePrioritization.correlationId.isNullOrBlank()) {
117             val id = messagePrioritization.id
118             val group = messagePrioritization.group
119             val correlationId = messagePrioritization.correlationId!!
120             val types = getGroupCorrelationTypes(messagePrioritization)
121             log.info(
122                 "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
123                     "correlation types($types), priority(${messagePrioritization.priority}), " +
124                     "correlation id($correlationId)"
125             )
126
127             /** Get all previously received messages from database for group and optional types and correlation Id */
128             val waitingCorrelatedStoreMessages = messagePrioritizationStateService
129                 .getCorrelatedMessages(
130                     group,
131                     arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
132                 )
133
134             /** If multiple records found, then check correlation */
135             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
136                 /** Check all correlation satisfies */
137                 val correlationResults = MessageCorrelationUtils
138                     .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
139
140                 if (correlationResults.correlated) {
141                     /** Update all messages to Aggregated state */
142                     messagePrioritizationStateService.setMessagesState(
143                         waitingCorrelatedStoreMessages.ids(),
144                         MessageState.PRIORITIZED.name
145                     )
146                     /** Correlation  satisfied, Send only correlated messages to aggregate processor */
147                     aggregate(waitingCorrelatedStoreMessages)
148                 } else {
149                     /** Correlation not satisfied */
150                     log.trace("correlation not matched : ${correlationResults.message}")
151                     // Update the Message state to Wait
152                     messagePrioritizationStateService.setMessagesState(
153                         waitingCorrelatedStoreMessages.ids(),
154                         MessageState.WAIT.name
155                     )
156                 }
157             } else {
158                 /** received first message of group and correlation Id, update the message with wait state */
159                 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
160             }
161         } else {
162             /** No Correlation check needed, simply forward to next processor. */
163             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
164             aggregate(arrayListOf(messagePrioritization))
165         }
166     }
167
168     open suspend fun aggregate(messages: List<MessagePrioritization>) {
169         log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
170         if (!messages.isNullOrEmpty()) {
171             try {
172                 /** Implement Aggregation logic in overridden class, If necessary,
173                 Populate New Message and Update status with Prioritized, Forward the message to next processor */
174                 handleAggregation(messages)
175             } catch (e: Exception) {
176                 val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
177                 if (!messages.isNullOrEmpty()) {
178                     messages.forEach { messagePrioritization ->
179                         try {
180                             /** Update the data store */
181                             messagePrioritizationStateService.setMessageStateANdError(
182                                 messagePrioritization.id,
183                                 MessageState.ERROR.name, error
184                             )
185                         } catch (sendException: Exception) {
186                             log.error(
187                                 "failed to update/publish error message(${messagePrioritization.id}) : " +
188                                     "${sendException.message}", e
189                             )
190                         }
191                     }
192                     /** Publish to output topic */
193                     output(messages)
194                 }
195             }
196         }
197     }
198
199     /** Child will override this implementation , if necessary
200      *  Here the place child has to implement custom Sequencing and Aggregation logic.
201      * */
202     abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
203
204     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
205      * otherwise correlation happens with group and correlationId */
206     abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
207 }