Change EventListener functions to non-blocking scope
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / BluePrintProcessingKafkaConsumer.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
18
19 import kotlinx.coroutines.GlobalScope
20 import kotlinx.coroutines.channels.consumeEach
21 import kotlinx.coroutines.launch
22 import kotlinx.coroutines.runBlocking
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
26 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
27 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import org.onap.ccsdk.cds.controllerblueprints.core.updateErrorMessage
30 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
31 import org.springframework.boot.context.event.ApplicationReadyEvent
32 import org.springframework.context.event.EventListener
33 import org.springframework.stereotype.Service
34 import java.util.concurrent.Phaser
35 import javax.annotation.PreDestroy
36
37 @ConditionalOnProperty(
38     name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
39     havingValue = "true"
40 )
41 @Service
42 open class BluePrintProcessingKafkaConsumer(
43     private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
44     private val executionServiceHandler: ExecutionServiceHandler
45 ) {
46
47     val log = logger(BluePrintProcessingKafkaConsumer::class)
48
49     private val ph = Phaser(1)
50
51     private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
52
53     companion object {
54         const val CONSUMER_SELECTOR = "self-service-api"
55         const val PRODUCER_SELECTOR = "self-service-api"
56     }
57
58     @EventListener(ApplicationReadyEvent::class)
59     fun setupMessageListener() = GlobalScope.launch {
60         try {
61             log.info(
62                 "Setting up message consumer($CONSUMER_SELECTOR)" +
63                         "message producer($PRODUCER_SELECTOR)..."
64             )
65
66             /** Get the Message Consumer Service **/
67             blueprintMessageConsumerService = try {
68                 bluePrintMessageLibPropertyService
69                     .blueprintMessageConsumerService(CONSUMER_SELECTOR)
70             } catch (e: BluePrintProcessorException) {
71                 val errorMsg = "Failed creating Kafka consumer message service."
72                 throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
73                         "Wrong Kafka selector provided or internal error in Kafka service.")
74             } catch (e: Exception) {
75                 throw BluePrintProcessorException("failed to create consumer service ${e.message}")
76             }
77
78             /** Get the Message Producer Service **/
79             val blueprintMessageProducerService = try {
80                 bluePrintMessageLibPropertyService
81                         .blueprintMessageProducerService(PRODUCER_SELECTOR)
82             } catch (e: BluePrintProcessorException) {
83                 val errorMsg = "Failed creating Kafka producer message service."
84                 throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
85                         "Wrong Kafka selector provided or internal error in Kafka service.")
86             } catch (e: Exception) {
87                 throw BluePrintProcessorException("failed to create producer service ${e.message}")
88             }
89
90             launch {
91                 /** Subscribe to the consumer topics */
92                 val additionalConfig: MutableMap<String, Any> = hashMapOf()
93                 val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
94                 channel.consumeEach { message ->
95                     launch {
96                         try {
97                             ph.register()
98                             log.trace("Consumed Message : $message")
99                             val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
100                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
101                             blueprintMessageProducerService.sendMessage(executionServiceOutput)
102                         } catch (e: Exception) {
103                             log.error("failed in processing the consumed message : $message", e)
104                         } finally {
105                             ph.arriveAndDeregister()
106                         }
107                     }
108                 }
109             }
110         } catch (e: Exception) {
111             log.error(
112                 "failed to start message consumer($CONSUMER_SELECTOR) " +
113                         "message producer($PRODUCER_SELECTOR) ", e
114             )
115         }
116     }
117
118     @PreDestroy
119     fun shutdownMessageListener() = runBlocking {
120         try {
121             log.info(
122                 "Shutting down message consumer($CONSUMER_SELECTOR)" +
123                         "message producer($PRODUCER_SELECTOR)..."
124             )
125             blueprintMessageConsumerService.shutDown()
126             ph.arriveAndAwaitAdvance()
127         } catch (e: Exception) {
128             log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
129         }
130     }
131 }