431e02f30f40385bab86588a8cfb16d32a6fb973
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
18
19 import org.apache.kafka.streams.processor.Cancellable
20 import org.apache.kafka.streams.processor.ProcessorContext
21 import org.apache.kafka.streams.processor.PunctuationType
22 import org.apache.kafka.streams.processor.To
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
28 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
29 import org.onap.ccsdk.cds.controllerblueprints.core.logger
30 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
31 import java.time.Duration
32 import java.util.UUID
33
34 open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
35
36     private val log = logger(MessagePrioritizeProcessor::class)
37
38     lateinit var expiryCancellable: Cancellable
39     lateinit var cleanCancellable: Cancellable
40
41     override suspend fun processNB(key: ByteArray, value: ByteArray) {
42         log.info("***** received in prioritize processor key(${String(key)})")
43         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
44             ?: throw BluePrintProcessorException("failed to convert")
45         try {
46             // Save the Message
47             messagePrioritizationStateService.saveMessage(messagePrioritize)
48             handleCorrelationAndNextStep(messagePrioritize)
49         } catch (e: Exception) {
50             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
51             log.error(messagePrioritize.error)
52             /** Update the data store */
53             messagePrioritizationStateService.setMessageStateANdError(
54                 messagePrioritize.id, MessageState.ERROR.name,
55                 messagePrioritize.error!!
56             )
57             /** Publish to Output topic */
58             this.processorContext.forward(
59                 messagePrioritize.id, messagePrioritize,
60                 To.child(MessagePrioritizationConstants.SINK_OUTPUT)
61             )
62         }
63     }
64
65     override fun init(context: ProcessorContext) {
66         super.init(context)
67         /** set up expiry marking cron */
68         initializeExpiryPunctuator()
69         /** Set up cleaning records cron */
70         initializeCleanPunctuator()
71     }
72
73     override fun close() {
74         log.info(
75             "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
76                     "taskId(${processorContext.taskId()})"
77         )
78         expiryCancellable.cancel()
79         cleanCancellable.cancel()
80     }
81
82     open fun initializeExpiryPunctuator() {
83         val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
84         expiryPunctuator.processorContext = processorContext
85         expiryPunctuator.configuration = prioritizationConfiguration
86         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
87         expiryCancellable = processorContext.schedule(
88             Duration.ofMillis(expiryConfiguration.frequencyMilli),
89             PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
90         )
91         log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
92     }
93
94     open fun initializeCleanPunctuator() {
95         val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
96         cleanPunctuator.processorContext = processorContext
97         cleanPunctuator.configuration = prioritizationConfiguration
98         val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
99         cleanCancellable = processorContext.schedule(
100             Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
101             PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
102         )
103         log.info(
104             "Clean punctuator setup complete with expiry " +
105                     "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
106         )
107     }
108
109     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
110         /** Check correlation enabled and correlation field has populated */
111         if (!messagePrioritization.correlationId.isNullOrBlank()) {
112             val id = messagePrioritization.id
113             val group = messagePrioritization.group
114             val correlationId = messagePrioritization.correlationId!!
115             val types = getGroupCorrelationTypes(messagePrioritization)
116             log.info(
117                 "checking correlation for message($id), group($group), types($types), " +
118                         "correlation id($correlationId)"
119             )
120
121             /** Get all previously received messages from database for group and optional types and correlation Id */
122             val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
123                 group,
124                 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
125             )
126
127             /** If multiple records found, then check correlation */
128             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
129                 /** Check all correlation satisfies */
130                 val correlationResults = MessageCorrelationUtils
131                     .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
132
133                 if (correlationResults.correlated) {
134                     /** Correlation  satisfied */
135                     val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
136                     /**  Send only correlated ids to next processor */
137                     this.processorContext.forward(
138                         UUID.randomUUID().toString(), correlatedIds,
139                         To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
140                     )
141                 } else {
142                     /** Correlation not satisfied */
143                     log.trace("correlation not matched : ${correlationResults.message}")
144                     val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
145                     // Update the Message state to Wait
146                     messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
147                 }
148             } else {
149                 /** received first message of group and correlation Id, update the message with wait state */
150                 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
151             }
152         } else {
153             // No Correlation check needed, simply forward to next processor.
154             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
155             this.processorContext.forward(
156                 messagePrioritization.id, messagePrioritization.id,
157                 To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
158             )
159         }
160     }
161
162     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
163      * otherwise correlation happens with group and correlationId */
164     open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
165         return null
166     }
167 }