Rest endpoint for message Prioritization
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / AbstractMessagePrioritizationService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import org.apache.kafka.streams.processor.ProcessorContext
20 import org.apache.kafka.streams.processor.To
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29
30 /** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
31 abstract class AbstractMessagePrioritizationService(
32     private val messagePrioritizationStateService: MessagePrioritizationStateService
33 ) : MessagePrioritizationService {
34
35     private val log = logger(AbstractMessagePrioritizationService::class)
36
37     var processorContext: ProcessorContext? = null
38
39     override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
40         this.processorContext = processorContext
41     }
42
43     override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
44         try {
45             log.info("***** received in prioritize processor key(${messagePrioritize.id})")
46             /** Get the cluster lock for message group */
47             val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
48             // Save the Message
49             messagePrioritizationStateService.saveMessage(messagePrioritize)
50             handleCorrelationAndNextStep(messagePrioritize)
51             /** Cluster unLock for message group */
52             MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
53         } catch (e: Exception) {
54             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
55             log.error(messagePrioritize.error)
56             /** Update the data store */
57             messagePrioritizationStateService.setMessageStateANdError(
58                 messagePrioritize.id, MessageState.ERROR.name,
59                 messagePrioritize.error!!
60             )
61         }
62     }
63
64     override suspend fun output(id: String) {
65         log.info("$$$$$ received in output processor id($id)")
66         val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
67         /** Check for Kafka Processing, If yes, then send to the output topic */
68         if (this.processorContext != null) {
69             processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
70         }
71     }
72
73     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
74         /** Check correlation enabled and correlation field has populated */
75         if (!messagePrioritization.correlationId.isNullOrBlank()) {
76             val id = messagePrioritization.id
77             val group = messagePrioritization.group
78             val correlationId = messagePrioritization.correlationId!!
79             val types = getGroupCorrelationTypes(messagePrioritization)
80             log.info(
81                 "checking correlation for message($id), group($group), types($types), " +
82                     "correlation id($correlationId)"
83             )
84
85             /** Get all previously received messages from database for group and optional types and correlation Id */
86             val waitingCorrelatedStoreMessages = messagePrioritizationStateService
87                 .getCorrelatedMessages(
88                     group,
89                     arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
90                 )
91
92             /** If multiple records found, then check correlation */
93             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
94                 /** Check all correlation satisfies */
95                 val correlationResults = MessageCorrelationUtils
96                     .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
97
98                 if (correlationResults.correlated) {
99                     /** Correlation  satisfied */
100                     val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
101                     /**  Send only correlated ids to aggregate processor */
102                     aggregate(correlatedIds)
103                 } else {
104                     /** Correlation not satisfied */
105                     log.trace("correlation not matched : ${correlationResults.message}")
106                     val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
107                     // Update the Message state to Wait
108                     messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
109                 }
110             } else {
111                 /** received first message of group and correlation Id, update the message with wait state */
112                 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
113             }
114         } else {
115             // No Correlation check needed, simply forward to next processor.
116             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
117             aggregate(messagePrioritization.id)
118         }
119     }
120
121     open suspend fun aggregate(strIds: String) {
122         log.info("@@@@@ received in aggregation processor ids($strIds)")
123         val ids = strIds.split(",").map { it.trim() }
124         if (!ids.isNullOrEmpty()) {
125             try {
126                 if (ids.size == 1) {
127                     /** No aggregation or sequencing needed, simpley forward to next processor */
128                     output(ids.first())
129                 } else {
130                     /** Implement Aggregation logic in overridden class, If necessary,
131                     Populate New Message and Update status with Prioritized, Forward the message to next processor */
132                     handleAggregation(ids)
133                     /** Update all messages to Aggregated state */
134                     messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
135                 }
136             } catch (e: Exception) {
137                 val error = "failed in Aggregate message($ids) : ${e.message}"
138                 log.error(error, e)
139                 val storeMessages = messagePrioritizationStateService.getMessages(ids)
140                 if (!storeMessages.isNullOrEmpty()) {
141                     storeMessages.forEach { messagePrioritization ->
142                         try {
143                             /** Update the data store */
144                             messagePrioritizationStateService.setMessageStateANdError(
145                                 messagePrioritization.id,
146                                 MessageState.ERROR.name, error
147                             )
148                             /** Publish to output topic */
149                             output(messagePrioritization.id)
150                         } catch (sendException: Exception) {
151                             log.error(
152                                 "failed to update/publish error message(${messagePrioritization.id}) : " +
153                                     "${sendException.message}", e
154                             )
155                         }
156                     }
157                 }
158             }
159         }
160     }
161
162     /** Child will override this implementation , if necessary
163      *  Here the place child has to implement custom Sequencing and Aggregation logic.
164      * */
165     abstract suspend fun handleAggregation(messageIds: List<String>)
166
167     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
168      * otherwise correlation happens with group and correlationId */
169     abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
170 }