2 * Copyright © 2019 Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
18 import kotlinx.coroutines.async
19 import kotlinx.coroutines.runBlocking
20 import org.apache.commons.lang3.builder.ToStringBuilder
21 import org.apache.kafka.clients.consumer.ConsumerRecord
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
24 import org.slf4j.LoggerFactory
25 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
26 import org.springframework.kafka.annotation.KafkaListener
27 import org.springframework.stereotype.Service
29 //TODO("Implement with property service and remove spring bindings")
30 @ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
32 open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
33 private val executionServiceHandler: ExecutionServiceHandler) {
35 private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
38 // TODO PREFIX should be retrieved from model or from request.
39 const val PREFIX = "self-service-api"
40 const val EXECUTION_STATUS = 200
43 @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
44 open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) {
47 log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value()))
49 // Process the message.
51 processMessage(record.value())
56 private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
58 val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
60 if (executionServiceOutput.status.code == EXECUTION_STATUS) {
61 val blueprintMessageProducerService = propertyService.blueprintMessageProducerService(PREFIX)
63 val payload = executionServiceOutput.payload
65 log.info("The payload to publish is {}", payload)
67 blueprintMessageProducerService.sendMessage(payload)
69 log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)