2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2021 Bell Canada.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
20 import io.micrometer.core.instrument.MeterRegistry
21 import kotlinx.coroutines.GlobalScope
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.mdcKafkaCoroutineScope
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
32 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import org.onap.ccsdk.cds.controllerblueprints.core.updateErrorMessage
35 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
36 import org.springframework.boot.context.event.ApplicationReadyEvent
37 import org.springframework.context.event.EventListener
38 import org.springframework.stereotype.Service
39 import java.nio.charset.Charset
41 import java.util.concurrent.Phaser
42 import javax.annotation.PreDestroy
44 @ConditionalOnProperty(
45 name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
49 open class BluePrintProcessingKafkaConsumer(
50 private val blueprintMessageLibPropertyService: BluePrintMessageLibPropertyService,
51 private val executionServiceHandler: ExecutionServiceHandler,
52 private val meterRegistry: MeterRegistry
55 val log = logger(BluePrintProcessingKafkaConsumer::class)
57 private val ph = Phaser(1)
59 private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
63 const val CONSUMER_SELECTOR = "self-service-api"
64 const val PRODUCER_SELECTOR = "self-service-api"
67 @EventListener(ApplicationReadyEvent::class)
68 fun setupMessageListener() = GlobalScope.launch {
71 "Setting up message consumer($CONSUMER_SELECTOR)" +
72 "message producer($PRODUCER_SELECTOR)..."
75 /** Get the Message Consumer Service **/
76 blueprintMessageConsumerService = try {
77 blueprintMessageLibPropertyService
78 .blueprintMessageConsumerService(CONSUMER_SELECTOR)
79 } catch (e: BluePrintProcessorException) {
80 val errorMsg = "Failed creating Kafka consumer message service."
81 throw e.updateErrorMessage(
82 SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
83 "Wrong Kafka selector provided or internal error in Kafka service."
85 } catch (e: Exception) {
86 throw BluePrintProcessorException("failed to create consumer service ${e.message}")
89 /** Get the Message Producer Service **/
90 val blueprintMessageProducerService = try {
91 blueprintMessageLibPropertyService
92 .blueprintMessageProducerService(PRODUCER_SELECTOR)
93 } catch (e: BluePrintProcessorException) {
94 val errorMsg = "Failed creating Kafka producer message service."
95 throw e.updateErrorMessage(
96 SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
97 "Wrong Kafka selector provided or internal error in Kafka service."
99 } catch (e: Exception) {
100 throw BluePrintProcessorException("failed to create producer service ${e.message}")
104 /** Subscribe to the consumer topics */
105 val additionalConfig: MutableMap<String, Any> = hashMapOf()
106 val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
107 channel.consumeEach { message ->
111 val key = message.key() ?: UUID.randomUUID().toString()
112 val value = String(message.value(), Charset.defaultCharset())
113 val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
114 mdcKafkaCoroutineScope(executionServiceInput) {
116 "Consumed Message : topic(${message.topic()}) " +
117 "partition(${message.partition()}) " +
118 "leaderEpoch(${message.leaderEpoch().get()}) " +
119 "offset(${message.offset()}) " +
120 "key(${message.key()}) " +
121 BlueprintMessageUtils.getMessageLogData(executionServiceInput)
123 val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
124 blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
126 } catch (e: Exception) {
127 meterRegistry.counter(
128 BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
129 BlueprintMessageUtils.kafkaMetricTag(message.topic())
131 log.error("failed in processing the consumed message : $message", e)
133 ph.arriveAndDeregister()
138 } catch (e: Exception) {
140 "failed to start message consumer($CONSUMER_SELECTOR) " +
141 "message producer($PRODUCER_SELECTOR) ",
148 fun shutdownMessageListener() = runBlocking {
151 "Shutting down message consumer($CONSUMER_SELECTOR)" +
152 "message producer($PRODUCER_SELECTOR)..."
154 blueprintMessageConsumerService.shutDown()
155 ph.arriveAndAwaitAdvance()
156 } catch (e: Exception) {
157 log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)