4e4e2da7a16ba82d443528f4527c94fe7c8abd3d
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / topology / MessagePrioritizeProcessor.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
18
19 import org.apache.kafka.streams.processor.Cancellable
20 import org.apache.kafka.streams.processor.ProcessorContext
21 import org.apache.kafka.streams.processor.PunctuationType
22 import org.apache.kafka.streams.processor.To
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
29 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
30 import org.onap.ccsdk.cds.controllerblueprints.core.logger
31 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
32 import java.time.Duration
33 import java.util.UUID
34
35 open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
36
37     private val log = logger(MessagePrioritizeProcessor::class)
38
39     lateinit var expiryCancellable: Cancellable
40     lateinit var cleanCancellable: Cancellable
41
42     override suspend fun processNB(key: ByteArray, value: ByteArray) {
43         log.info("***** received in prioritize processor key(${String(key)})")
44         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
45             ?: throw BluePrintProcessorException("failed to convert")
46         try {
47             /** Get the cluster lock for message group */
48             val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
49             // Save the Message
50             messagePrioritizationStateService.saveMessage(messagePrioritize)
51             handleCorrelationAndNextStep(messagePrioritize)
52             /** Cluster unLock for message group */
53             MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
54         } catch (e: Exception) {
55             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
56             log.error(messagePrioritize.error)
57             /** Update the data store */
58             messagePrioritizationStateService.setMessageStateANdError(
59                 messagePrioritize.id, MessageState.ERROR.name,
60                 messagePrioritize.error!!
61             )
62             /** Publish to Output topic */
63             this.processorContext.forward(
64                 messagePrioritize.id, messagePrioritize,
65                 To.child(MessagePrioritizationConstants.SINK_OUTPUT)
66             )
67         }
68     }
69
70     override fun init(context: ProcessorContext) {
71         super.init(context)
72         /** set up expiry marking cron */
73         initializeExpiryPunctuator()
74         /** Set up cleaning records cron */
75         initializeCleanPunctuator()
76         /** Set up Cluster Service */
77         initializeClusterService()
78     }
79
80     override fun close() {
81         log.info(
82             "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
83                 "taskId(${processorContext.taskId()})"
84         )
85         expiryCancellable.cancel()
86         cleanCancellable.cancel()
87     }
88
89     open fun initializeExpiryPunctuator() {
90         val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
91         expiryPunctuator.processorContext = processorContext
92         expiryPunctuator.configuration = prioritizationConfiguration
93         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
94         expiryCancellable = processorContext.schedule(
95             Duration.ofMillis(expiryConfiguration.frequencyMilli),
96             PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
97         )
98         log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
99     }
100
101     open fun initializeCleanPunctuator() {
102         val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
103         cleanPunctuator.processorContext = processorContext
104         cleanPunctuator.configuration = prioritizationConfiguration
105         val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
106         cleanCancellable = processorContext.schedule(
107             Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
108             PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
109         )
110         log.info(
111             "Clean punctuator setup complete with expiry " +
112                 "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
113         )
114     }
115
116     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
117         /** Check correlation enabled and correlation field has populated */
118         if (!messagePrioritization.correlationId.isNullOrBlank()) {
119             val id = messagePrioritization.id
120             val group = messagePrioritization.group
121             val correlationId = messagePrioritization.correlationId!!
122             val types = getGroupCorrelationTypes(messagePrioritization)
123             log.info(
124                 "checking correlation for message($id), group($group), types($types), " +
125                     "correlation id($correlationId)"
126             )
127
128             /** Get all previously received messages from database for group and optional types and correlation Id */
129             val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
130                 group,
131                 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
132             )
133
134             /** If multiple records found, then check correlation */
135             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
136                 /** Check all correlation satisfies */
137                 val correlationResults = MessageCorrelationUtils
138                     .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
139
140                 if (correlationResults.correlated) {
141                     /** Correlation  satisfied */
142                     val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
143                     /**  Send only correlated ids to next processor */
144                     this.processorContext.forward(
145                         UUID.randomUUID().toString(), correlatedIds,
146                         To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
147                     )
148                 } else {
149                     /** Correlation not satisfied */
150                     log.trace("correlation not matched : ${correlationResults.message}")
151                     val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
152                     // Update the Message state to Wait
153                     messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
154                 }
155             } else {
156                 /** received first message of group and correlation Id, update the message with wait state */
157                 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
158             }
159         } else {
160             // No Correlation check needed, simply forward to next processor.
161             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
162             this.processorContext.forward(
163                 messagePrioritization.id, messagePrioritization.id,
164                 To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
165             )
166         }
167     }
168
169     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
170      * otherwise correlation happens with group and correlationId */
171     open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
172         return null
173     }
174 }