2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
19 import org.apache.kafka.streams.processor.Cancellable
20 import org.apache.kafka.streams.processor.ProcessorContext
21 import org.apache.kafka.streams.processor.PunctuationType
22 import org.apache.kafka.streams.processor.To
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
28 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
29 import org.onap.ccsdk.cds.controllerblueprints.core.logger
30 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
31 import java.time.Duration
33 open class DefaultMessagePrioritizeProcessor(
34 private val messagePrioritizationStateService: MessagePrioritizationStateService,
35 private val messagePrioritizationService: MessagePrioritizationService
36 ) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
38 private val log = logger(DefaultMessagePrioritizeProcessor::class)
40 lateinit var expiryCancellable: Cancellable
41 lateinit var cleanCancellable: Cancellable
43 override suspend fun processNB(key: ByteArray, value: ByteArray) {
45 val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
46 ?: throw BluePrintProcessorException("failed to convert")
48 messagePrioritizationService.setKafkaProcessorContext(processorContext)
49 messagePrioritizationService.prioritize(messagePrioritize)
50 } catch (e: Exception) {
51 messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
52 log.error(messagePrioritize.error)
53 /** Update the data store */
54 messagePrioritizationStateService.setMessageStateANdError(
55 messagePrioritize.id, MessageState.ERROR.name,
56 messagePrioritize.error!!
58 /** Publish to Output topic */
59 this.processorContext.forward(
60 messagePrioritize.id, messagePrioritize,
61 To.child(MessagePrioritizationConstants.SINK_OUTPUT)
66 override fun init(context: ProcessorContext) {
68 /** set up expiry marking cron */
69 initializeExpiryPunctuator()
70 /** Set up cleaning records cron */
71 initializeCleanPunctuator()
74 override fun close() {
76 "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
77 "taskId(${processorContext.taskId()})"
79 expiryCancellable.cancel()
80 cleanCancellable.cancel()
83 open fun initializeExpiryPunctuator() {
84 val expiryPunctuator =
85 MessagePriorityExpiryPunctuator(
86 messagePrioritizationStateService
88 expiryPunctuator.processorContext = processorContext
89 expiryPunctuator.configuration = prioritizationConfiguration
90 val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
91 expiryCancellable = processorContext.schedule(
92 Duration.ofMillis(expiryConfiguration.frequencyMilli),
93 PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
95 log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
98 open fun initializeCleanPunctuator() {
100 MessagePriorityCleanPunctuator(
101 messagePrioritizationStateService
103 cleanPunctuator.processorContext = processorContext
104 cleanPunctuator.configuration = prioritizationConfiguration
105 val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
106 cleanCancellable = processorContext.schedule(
107 Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
108 PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
111 "Clean punctuator setup complete with expiry " +
112 "hold(${cleanConfiguration.expiredRecordsHoldDays})days"