Rest endpoint for message Prioritization
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / kafka / DefaultMessagePrioritizeProcessor.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
18
19 import org.apache.kafka.streams.processor.Cancellable
20 import org.apache.kafka.streams.processor.ProcessorContext
21 import org.apache.kafka.streams.processor.PunctuationType
22 import org.apache.kafka.streams.processor.To
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
28 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
29 import org.onap.ccsdk.cds.controllerblueprints.core.logger
30 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
31 import java.time.Duration
32
33 open class DefaultMessagePrioritizeProcessor(
34     private val messagePrioritizationStateService: MessagePrioritizationStateService,
35     private val messagePrioritizationService: MessagePrioritizationService
36 ) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
37
38     private val log = logger(DefaultMessagePrioritizeProcessor::class)
39
40     lateinit var expiryCancellable: Cancellable
41     lateinit var cleanCancellable: Cancellable
42
43     override suspend fun processNB(key: ByteArray, value: ByteArray) {
44
45         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
46             ?: throw BluePrintProcessorException("failed to convert")
47         try {
48             messagePrioritizationService.setKafkaProcessorContext(processorContext)
49             messagePrioritizationService.prioritize(messagePrioritize)
50         } catch (e: Exception) {
51             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
52             log.error(messagePrioritize.error)
53             /** Update the data store */
54             messagePrioritizationStateService.setMessageStateANdError(
55                 messagePrioritize.id, MessageState.ERROR.name,
56                 messagePrioritize.error!!
57             )
58             /** Publish to Output topic */
59             this.processorContext.forward(
60                 messagePrioritize.id, messagePrioritize,
61                 To.child(MessagePrioritizationConstants.SINK_OUTPUT)
62             )
63         }
64     }
65
66     override fun init(context: ProcessorContext) {
67         super.init(context)
68         /** set up expiry marking cron */
69         initializeExpiryPunctuator()
70         /** Set up cleaning records cron */
71         initializeCleanPunctuator()
72     }
73
74     override fun close() {
75         log.info(
76             "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
77                 "taskId(${processorContext.taskId()})"
78         )
79         expiryCancellable.cancel()
80         cleanCancellable.cancel()
81     }
82
83     open fun initializeExpiryPunctuator() {
84         val expiryPunctuator =
85             MessagePriorityExpiryPunctuator(
86                 messagePrioritizationStateService
87             )
88         expiryPunctuator.processorContext = processorContext
89         expiryPunctuator.configuration = prioritizationConfiguration
90         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
91         expiryCancellable = processorContext.schedule(
92             Duration.ofMillis(expiryConfiguration.frequencyMilli),
93             PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
94         )
95         log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
96     }
97
98     open fun initializeCleanPunctuator() {
99         val cleanPunctuator =
100             MessagePriorityCleanPunctuator(
101                 messagePrioritizationStateService
102             )
103         cleanPunctuator.processorContext = processorContext
104         cleanPunctuator.configuration = prioritizationConfiguration
105         val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
106         cleanCancellable = processorContext.schedule(
107             Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
108             PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
109         )
110         log.info(
111             "Clean punctuator setup complete with expiry " +
112                 "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
113         )
114     }
115 }