2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2021 Bell Canada.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
20 import io.micrometer.core.instrument.MeterRegistry
21 import kotlinx.coroutines.GlobalScope
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
30 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
31 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
32 import org.onap.ccsdk.cds.controllerblueprints.core.logger
33 import org.onap.ccsdk.cds.controllerblueprints.core.updateErrorMessage
34 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
35 import org.springframework.boot.context.event.ApplicationReadyEvent
36 import org.springframework.context.event.EventListener
37 import org.springframework.stereotype.Service
38 import java.nio.charset.Charset
40 import java.util.concurrent.Phaser
41 import javax.annotation.PreDestroy
43 @ConditionalOnProperty(
44 name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
48 open class BlueprintProcessingKafkaConsumer(
49 private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService,
50 private val executionServiceHandler: ExecutionServiceHandler,
51 private val meterRegistry: MeterRegistry
54 val log = logger(BlueprintProcessingKafkaConsumer::class)
56 private val ph = Phaser(1)
58 private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
62 const val CONSUMER_SELECTOR = "self-service-api"
63 const val PRODUCER_SELECTOR = "self-service-api"
66 @EventListener(ApplicationReadyEvent::class)
67 fun setupMessageListener() = GlobalScope.launch {
70 "Setting up message consumer($CONSUMER_SELECTOR)" +
71 "message producer($PRODUCER_SELECTOR)..."
74 /** Get the Message Consumer Service **/
75 blueprintMessageConsumerService = try {
76 blueprintMessageLibPropertyService
77 .blueprintMessageConsumerService(CONSUMER_SELECTOR)
78 } catch (e: BlueprintProcessorException) {
79 val errorMsg = "Failed creating Kafka consumer message service."
80 throw e.updateErrorMessage(
81 SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
82 "Wrong Kafka selector provided or internal error in Kafka service."
84 } catch (e: Exception) {
85 throw BlueprintProcessorException("failed to create consumer service ${e.message}")
88 /** Get the Message Producer Service **/
89 val blueprintMessageProducerService = try {
90 blueprintMessageLibPropertyService
91 .blueprintMessageProducerService(PRODUCER_SELECTOR)
92 } catch (e: BlueprintProcessorException) {
93 val errorMsg = "Failed creating Kafka producer message service."
94 throw e.updateErrorMessage(
95 SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
96 "Wrong Kafka selector provided or internal error in Kafka service."
98 } catch (e: Exception) {
99 throw BlueprintProcessorException("failed to create producer service ${e.message}")
103 /** Subscribe to the consumer topics */
104 val additionalConfig: MutableMap<String, Any> = hashMapOf()
105 val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
106 channel.consumeEach { message ->
110 val key = message.key() ?: UUID.randomUUID().toString()
111 val value = String(message.value(), Charset.defaultCharset())
112 val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
114 "Consumed Message : topic(${message.topic()}) " +
115 "partition(${message.partition()}) " +
116 "leaderEpoch(${message.leaderEpoch().get()}) " +
117 "offset(${message.offset()}) " +
118 "key(${message.key()}) " +
119 BlueprintMessageUtils.getMessageLogData(executionServiceInput)
121 val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
122 blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
123 } catch (e: Exception) {
124 meterRegistry.counter(
125 BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
126 BlueprintMessageUtils.kafkaMetricTag(message.topic())
128 log.error("failed in processing the consumed message : $message", e)
130 ph.arriveAndDeregister()
135 } catch (e: Exception) {
137 "failed to start message consumer($CONSUMER_SELECTOR) " +
138 "message producer($PRODUCER_SELECTOR) ",
145 fun shutdownMessageListener() = runBlocking {
148 "Shutting down message consumer($CONSUMER_SELECTOR)" +
149 "message producer($PRODUCER_SELECTOR)..."
151 blueprintMessageConsumerService.shutDown()
152 ph.arriveAndAwaitAdvance()
153 } catch (e: Exception) {
154 log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)