b339903c55bb1b82b56b50b70efbd94b513830d1
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
18
19 import kotlinx.coroutines.channels.consumeEach
20 import kotlinx.coroutines.launch
21 import kotlinx.coroutines.runBlocking
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
25 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
26 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
28 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
29 import org.springframework.boot.context.event.ApplicationReadyEvent
30 import org.springframework.context.event.EventListener
31 import org.springframework.stereotype.Service
32 import javax.annotation.PreDestroy
33
34 @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
35         havingValue = "true")
36 @Service
37 open class BluePrintProcessingKafkaConsumer(
38         private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
39         private val executionServiceHandler: ExecutionServiceHandler) {
40
41     val log = logger(BluePrintProcessingKafkaConsumer::class)
42
43     private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
44
45     companion object {
46         const val CONSUMER_SELECTOR = "self-service-api"
47         const val PRODUCER_SELECTOR = "self-service-api"
48     }
49
50     @EventListener(ApplicationReadyEvent::class)
51     fun setupMessageListener() = runBlocking {
52         try {
53             log.info("Setting up message consumer($CONSUMER_SELECTOR) and " +
54                     "message producer($PRODUCER_SELECTOR)...")
55
56             /** Get the Message Consumer Service **/
57             blueprintMessageConsumerService = try {
58                 bluePrintMessageLibPropertyService
59                         .blueprintMessageConsumerService(CONSUMER_SELECTOR)
60             } catch (e: Exception) {
61                 throw BluePrintProcessorException("failed to create consumer service ${e.message}")
62             }
63
64             /** Get the Message Producer Service **/
65             val blueprintMessageProducerService = try {
66                 bluePrintMessageLibPropertyService
67                         .blueprintMessageProducerService(PRODUCER_SELECTOR)
68             } catch (e: Exception) {
69                 throw BluePrintProcessorException("failed to create producer service ${e.message}")
70             }
71
72             launch {
73                 /** Subscribe to the consumer topics */
74                 val additionalConfig: MutableMap<String, Any> = hashMapOf()
75                 val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
76                 channel.consumeEach { message ->
77                     launch {
78                         try {
79                             log.trace("Consumed Message : $message")
80                             val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
81                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
82                             //TODO("In future, Message publisher configuration vary with respect to request")
83                             /** Send the response message */
84                             blueprintMessageProducerService.sendMessage(executionServiceOutput)
85                         } catch (e: Exception) {
86                             log.error("failed in processing the consumed message : $message", e)
87                         }
88                     }
89                 }
90             }
91         } catch (e: Exception) {
92             log.error("failed to start message consumer($CONSUMER_SELECTOR) and " +
93                     "message producer($PRODUCER_SELECTOR) ", e)
94         }
95     }
96
97     @PreDestroy
98     fun shutdownMessageListener() = runBlocking {
99         try {
100             log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " +
101                     "message producer($PRODUCER_SELECTOR)...")
102             blueprintMessageConsumerService.shutDown()
103         } catch (e: Exception) {
104             log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
105         }
106     }
107
108 }