1d219a83ea60523ede3159e9cea0632f5aadb20e
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2019 Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
17
18 import kotlinx.coroutines.async
19 import kotlinx.coroutines.runBlocking
20 import org.apache.commons.lang3.builder.ToStringBuilder
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
23 import org.slf4j.LoggerFactory
24 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
25 import org.springframework.kafka.annotation.KafkaListener
26 import org.springframework.stereotype.Service
27
28 @ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
29 @Service
30 open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
31                                private val executionServiceHandler: ExecutionServiceHandler) {
32
33     private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
34
35     companion object {
36         // TODO PREFIX should be retrieved from model or from request.
37         const val PREFIX = "self-service-api"
38         const val EXECUTION_STATUS = 200
39     }
40
41     @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
42     open fun receive(input: ExecutionServiceInput) {
43
44         log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
45
46         runBlocking {
47             log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
48
49             // Process the message.
50             async {
51                 processMessage(input)
52             }
53         }
54     }
55
56     private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
57
58         val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
59
60        if (executionServiceOutput.status.code == EXECUTION_STATUS) {
61            val bluePrintMessageClientService = propertyService
62                    .blueprintMessageClientService(PREFIX)
63
64            val payload = executionServiceOutput.payload
65
66            log.info("The payload to publish is {}", payload)
67
68             bluePrintMessageClientService.sendMessage(payload)
69        }
70         else {
71            log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
72        }
73     }
74 }