Prioritization Optional NATS consumer support
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / AbstractMessagePrioritizationService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
28
29 /** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
30 abstract class AbstractMessagePrioritizationService(
31     private val messagePrioritizationStateService: MessagePrioritizationStateService
32 ) : MessagePrioritizationService {
33
34     private val log = logger(AbstractMessagePrioritizationService::class)
35
36     lateinit var prioritizationConfiguration: PrioritizationConfiguration
37
38     override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
39         this.prioritizationConfiguration = prioritizationConfiguration
40     }
41
42     override fun getConfiguration(): PrioritizationConfiguration {
43         return this.prioritizationConfiguration
44     }
45
46     override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
47         try {
48             log.info("***** received in prioritize processor key(${messagePrioritize.id})")
49             check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
50
51             /** Get the cluster lock for message group */
52             val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
53             // Save the Message
54             messagePrioritizationStateService.saveMessage(messagePrioritize)
55             handleCorrelationAndNextStep(messagePrioritize)
56             /** Cluster unLock for message group */
57             MessageProcessorUtils.prioritizationUnLock(clusterLock)
58         } catch (e: Exception) {
59             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
60             log.error(messagePrioritize.error)
61             /** Update the data store */
62             messagePrioritizationStateService.setMessageStateANdError(
63                 messagePrioritize.id, MessageState.ERROR.name,
64                 messagePrioritize.error!!
65             )
66         }
67     }
68
69     override suspend fun output(messages: List<MessagePrioritization>) {
70         log.info("$$$$$ received in output processor id(${messages.ids()})")
71         messages.forEach { message ->
72             messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
73         }
74     }
75
76     override suspend fun updateExpiredMessages() {
77         check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
78
79         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
80         val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
81         try {
82             val fetchMessages = messagePrioritizationStateService
83                 .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
84             val expiredIds = fetchMessages?.ids()
85             if (!expiredIds.isNullOrEmpty()) {
86                 messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
87             }
88         } catch (e: Exception) {
89             log.error("failed in updating expired messages", e)
90         } finally {
91             MessageProcessorUtils.prioritizationUnLock(clusterLock)
92         }
93     }
94
95     override suspend fun cleanExpiredMessage() {
96         check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
97
98         val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
99         val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
100         try {
101             messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
102         } catch (e: Exception) {
103             log.error("failed in clean expired messages", e)
104         } finally {
105             MessageProcessorUtils.prioritizationUnLock(clusterLock)
106         }
107     }
108
109     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
110         /** Check correlation enabled and correlation field has populated */
111         if (!messagePrioritization.correlationId.isNullOrBlank()) {
112             val id = messagePrioritization.id
113             val group = messagePrioritization.group
114             val correlationId = messagePrioritization.correlationId!!
115             val types = getGroupCorrelationTypes(messagePrioritization)
116             log.info(
117                 "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
118                     "correlation types($types), priority(${messagePrioritization.priority}), " +
119                     "correlation id($correlationId)"
120             )
121
122             /** Get all previously received messages from database for group and optional types and correlation Id */
123             val waitingCorrelatedStoreMessages = messagePrioritizationStateService
124                 .getCorrelatedMessages(
125                     group,
126                     arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
127                 )
128
129             /** If multiple records found, then check correlation */
130             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
131                 /** Check all correlation satisfies */
132                 val correlationResults = MessageCorrelationUtils
133                     .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
134
135                 if (correlationResults.correlated) {
136                     /** Update all messages to Aggregated state */
137                     messagePrioritizationStateService.setMessagesState(
138                         waitingCorrelatedStoreMessages.ids(),
139                         MessageState.PRIORITIZED.name
140                     )
141                     /** Correlation  satisfied, Send only correlated messages to aggregate processor */
142                     aggregate(waitingCorrelatedStoreMessages)
143                 } else {
144                     /** Correlation not satisfied */
145                     log.trace("correlation not matched : ${correlationResults.message}")
146                     // Update the Message state to Wait
147                     messagePrioritizationStateService.setMessagesState(
148                         waitingCorrelatedStoreMessages.ids(),
149                         MessageState.WAIT.name
150                     )
151                 }
152             } else {
153                 /** received first message of group and correlation Id, update the message with wait state */
154                 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
155             }
156         } else {
157             /** No Correlation check needed, simply forward to next processor. */
158             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
159             aggregate(arrayListOf(messagePrioritization))
160         }
161     }
162
163     open suspend fun aggregate(messages: List<MessagePrioritization>) {
164         log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
165         if (!messages.isNullOrEmpty()) {
166             try {
167                 /** Implement Aggregation logic in overridden class, If necessary,
168                 Populate New Message and Update status with Prioritized, Forward the message to next processor */
169                 handleAggregation(messages)
170             } catch (e: Exception) {
171                 val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
172                 if (!messages.isNullOrEmpty()) {
173                     messages.forEach { messagePrioritization ->
174                         try {
175                             /** Update the data store */
176                             messagePrioritizationStateService.setMessageStateANdError(
177                                 messagePrioritization.id,
178                                 MessageState.ERROR.name, error
179                             )
180                         } catch (sendException: Exception) {
181                             log.error(
182                                 "failed to update/publish error message(${messagePrioritization.id}) : " +
183                                     "${sendException.message}", e
184                             )
185                         }
186                     }
187                     /** Publish to output topic */
188                     output(messages)
189                 }
190             }
191         }
192     }
193
194     /** Child will override this implementation , if necessary
195      *  Here the place child has to implement custom Sequencing and Aggregation logic.
196      * */
197     abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
198
199     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
200      * otherwise correlation happens with group and correlationId */
201     abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
202 }