Prioritization Optional NATS consumer support
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / kafka / AbstractKafkaMessagePrioritizationService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
18
19 import org.apache.kafka.streams.processor.ProcessorContext
20 import org.apache.kafka.streams.processor.To
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29
30 abstract class AbstractKafkaMessagePrioritizationService(
31     private val messagePrioritizationStateService: MessagePrioritizationStateService
32 ) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
33
34     private val log = logger(AbstractKafkaMessagePrioritizationService::class)
35
36     lateinit var processorContext: ProcessorContext
37
38     fun setKafkaProcessorContext(processorContext: ProcessorContext) {
39         this.processorContext = processorContext
40     }
41
42     override suspend fun output(messages: List<MessagePrioritization>) {
43         log.info("$$$$$ received in output processor id(${messages.ids()})")
44         checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
45         check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
46
47         messages.forEach { message ->
48             val updatedMessage =
49                 messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
50             processorContext.forward(
51                 updatedMessage.id,
52                 updatedMessage,
53                 To.child(MessagePrioritizationConstants.SINK_OUTPUT)
54             )
55         }
56     }
57
58     override suspend fun updateExpiredMessages() {
59         checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
60         check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
61
62         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
63         val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
64         try {
65             val fetchMessages = messagePrioritizationStateService
66                 .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
67             val expiredIds = fetchMessages?.ids()
68             if (expiredIds != null && expiredIds.isNotEmpty()) {
69                 messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
70                 fetchMessages.forEach { expiredMessage ->
71                     expiredMessage.state = MessageState.EXPIRED.name
72                     processorContext.forward(
73                         expiredMessage.id, expiredMessage,
74                         To.child(MessagePrioritizationConstants.SINK_OUTPUT)
75                     )
76                 }
77             }
78         } catch (e: Exception) {
79             log.error("failed in updating expired messages", e)
80         } finally {
81             MessageProcessorUtils.prioritizationUnLock(clusterLock)
82         }
83     }
84 }