2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
19 import org.apache.kafka.streams.processor.Cancellable
20 import org.apache.kafka.streams.processor.ProcessorContext
21 import org.apache.kafka.streams.processor.PunctuationType
22 import org.apache.kafka.streams.processor.To
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
29 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
30 import org.onap.ccsdk.cds.controllerblueprints.core.logger
31 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
32 import java.time.Duration
35 open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
37 private val log = logger(MessagePrioritizeProcessor::class)
39 lateinit var expiryCancellable: Cancellable
40 lateinit var cleanCancellable: Cancellable
42 override suspend fun processNB(key: ByteArray, value: ByteArray) {
43 log.info("***** received in prioritize processor key(${String(key)})")
44 val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
45 ?: throw BluePrintProcessorException("failed to convert")
47 /** Get the cluster lock for message group */
48 val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
50 messagePrioritizationStateService.saveMessage(messagePrioritize)
51 handleCorrelationAndNextStep(messagePrioritize)
52 /** Cluster unLock for message group */
53 MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
54 } catch (e: Exception) {
55 messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
56 log.error(messagePrioritize.error)
57 /** Update the data store */
58 messagePrioritizationStateService.setMessageStateANdError(
59 messagePrioritize.id, MessageState.ERROR.name,
60 messagePrioritize.error!!
62 /** Publish to Output topic */
63 this.processorContext.forward(
64 messagePrioritize.id, messagePrioritize,
65 To.child(MessagePrioritizationConstants.SINK_OUTPUT)
70 override fun init(context: ProcessorContext) {
72 /** set up expiry marking cron */
73 initializeExpiryPunctuator()
74 /** Set up cleaning records cron */
75 initializeCleanPunctuator()
76 /** Set up Cluster Service */
77 initializeClusterService()
80 override fun close() {
82 "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
83 "taskId(${processorContext.taskId()})"
85 expiryCancellable.cancel()
86 cleanCancellable.cancel()
89 open fun initializeExpiryPunctuator() {
90 val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
91 expiryPunctuator.processorContext = processorContext
92 expiryPunctuator.configuration = prioritizationConfiguration
93 val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
94 expiryCancellable = processorContext.schedule(
95 Duration.ofMillis(expiryConfiguration.frequencyMilli),
96 PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
98 log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
101 open fun initializeCleanPunctuator() {
102 val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
103 cleanPunctuator.processorContext = processorContext
104 cleanPunctuator.configuration = prioritizationConfiguration
105 val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
106 cleanCancellable = processorContext.schedule(
107 Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
108 PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
111 "Clean punctuator setup complete with expiry " +
112 "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
116 open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
117 /** Check correlation enabled and correlation field has populated */
118 if (!messagePrioritization.correlationId.isNullOrBlank()) {
119 val id = messagePrioritization.id
120 val group = messagePrioritization.group
121 val correlationId = messagePrioritization.correlationId!!
122 val types = getGroupCorrelationTypes(messagePrioritization)
124 "checking correlation for message($id), group($group), types($types), " +
125 "correlation id($correlationId)"
128 /** Get all previously received messages from database for group and optional types and correlation Id */
129 val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
131 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
134 /** If multiple records found, then check correlation */
135 if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
136 /** Check all correlation satisfies */
137 val correlationResults = MessageCorrelationUtils
138 .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
140 if (correlationResults.correlated) {
141 /** Correlation satisfied */
142 val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
143 /** Send only correlated ids to next processor */
144 this.processorContext.forward(
145 UUID.randomUUID().toString(), correlatedIds,
146 To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
149 /** Correlation not satisfied */
150 log.trace("correlation not matched : ${correlationResults.message}")
151 val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
152 // Update the Message state to Wait
153 messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
156 /** received first message of group and correlation Id, update the message with wait state */
157 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
160 // No Correlation check needed, simply forward to next processor.
161 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
162 this.processorContext.forward(
163 messagePrioritization.id, messagePrioritization.id,
164 To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
169 /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
170 * otherwise correlation happens with group and correlationId */
171 open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {