0b5d568d454041e519608adbe25628b7b6f71376
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2021 Bell Canada.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
19
20 import io.micrometer.core.instrument.MeterRegistry
21 import kotlinx.coroutines.GlobalScope
22 import kotlinx.coroutines.channels.consumeEach
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.mdcKafkaCoroutineScope
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
32 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import org.onap.ccsdk.cds.controllerblueprints.core.updateErrorMessage
35 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
36 import org.springframework.boot.context.event.ApplicationReadyEvent
37 import org.springframework.context.event.EventListener
38 import org.springframework.stereotype.Service
39 import java.nio.charset.Charset
40 import java.util.UUID
41 import java.util.concurrent.Phaser
42 import javax.annotation.PreDestroy
43
44 @ConditionalOnProperty(
45     name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
46     havingValue = "true"
47 )
48 @Service
49 open class BluePrintProcessingKafkaConsumer(
50     private val blueprintMessageLibPropertyService: BluePrintMessageLibPropertyService,
51     private val executionServiceHandler: ExecutionServiceHandler,
52     private val meterRegistry: MeterRegistry
53 ) {
54
55     val log = logger(BluePrintProcessingKafkaConsumer::class)
56
57     private val ph = Phaser(1)
58
59     private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
60
61     companion object {
62
63         const val CONSUMER_SELECTOR = "self-service-api"
64         const val PRODUCER_SELECTOR = "self-service-api"
65     }
66
67     @EventListener(ApplicationReadyEvent::class)
68     fun setupMessageListener() = GlobalScope.launch {
69         try {
70             log.info(
71                 "Setting up message consumer($CONSUMER_SELECTOR)" +
72                     "message producer($PRODUCER_SELECTOR)..."
73             )
74
75             /** Get the Message Consumer Service **/
76             blueprintMessageConsumerService = try {
77                 blueprintMessageLibPropertyService
78                     .blueprintMessageConsumerService(CONSUMER_SELECTOR)
79             } catch (e: BluePrintProcessorException) {
80                 val errorMsg = "Failed creating Kafka consumer message service."
81                 throw e.updateErrorMessage(
82                     SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
83                     "Wrong Kafka selector provided or internal error in Kafka service."
84                 )
85             } catch (e: Exception) {
86                 throw BluePrintProcessorException("failed to create consumer service ${e.message}")
87             }
88
89             /** Get the Message Producer Service **/
90             val blueprintMessageProducerService = try {
91                 blueprintMessageLibPropertyService
92                     .blueprintMessageProducerService(PRODUCER_SELECTOR)
93             } catch (e: BluePrintProcessorException) {
94                 val errorMsg = "Failed creating Kafka producer message service."
95                 throw e.updateErrorMessage(
96                     SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
97                     "Wrong Kafka selector provided or internal error in Kafka service."
98                 )
99             } catch (e: Exception) {
100                 throw BluePrintProcessorException("failed to create producer service ${e.message}")
101             }
102
103             launch {
104                 /** Subscribe to the consumer topics */
105                 val additionalConfig: MutableMap<String, Any> = hashMapOf()
106                 val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
107                 channel.consumeEach { message ->
108                     launch {
109                         try {
110                             ph.register()
111                             val key = message.key() ?: UUID.randomUUID().toString()
112                             val value = String(message.value(), Charset.defaultCharset())
113                             val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
114                             mdcKafkaCoroutineScope(executionServiceInput) {
115                                 log.info(
116                                     "Consumed Message : topic(${message.topic()}) " +
117                                         "partition(${message.partition()}) " +
118                                         "leaderEpoch(${message.leaderEpoch().get()}) " +
119                                         "offset(${message.offset()}) " +
120                                         "key(${message.key()}) " +
121                                         BlueprintMessageUtils.getMessageLogData(executionServiceInput)
122                                 )
123                                 val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
124                                 blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
125                             }
126                         } catch (e: Exception) {
127                             meterRegistry.counter(
128                                 BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
129                                 BlueprintMessageUtils.kafkaMetricTag(message.topic())
130                             ).increment()
131                             log.error("failed in processing the consumed message : $message", e)
132                         } finally {
133                             ph.arriveAndDeregister()
134                         }
135                     }
136                 }
137             }
138         } catch (e: Exception) {
139             log.error(
140                 "failed to start message consumer($CONSUMER_SELECTOR) " +
141                     "message producer($PRODUCER_SELECTOR) ",
142                 e
143             )
144         }
145     }
146
147     @PreDestroy
148     fun shutdownMessageListener() = runBlocking {
149         try {
150             log.info(
151                 "Shutting down message consumer($CONSUMER_SELECTOR)" +
152                     "message producer($PRODUCER_SELECTOR)..."
153             )
154             blueprintMessageConsumerService.shutDown()
155             ph.arriveAndAwaitAdvance()
156         } catch (e: Exception) {
157             log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
158         }
159     }
160 }