21fd84d11fae9974b9113b92c3430026cfd3c9fd
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import com.fasterxml.jackson.databind.node.ObjectNode
21 import io.micrometer.core.instrument.MeterRegistry
22 import org.apache.commons.lang.builder.ToStringBuilder
23 import org.apache.kafka.clients.producer.Callback
24 import org.apache.kafka.clients.producer.KafkaProducer
25 import org.apache.kafka.clients.producer.ProducerRecord
26 import org.apache.kafka.common.header.internals.RecordHeader
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
31 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
33 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
34 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
35 import org.slf4j.LoggerFactory
36 import java.nio.charset.Charset
37
38 class KafkaMessageProducerService(
39     private val messageProducerProperties: MessageProducerProperties,
40     private val meterRegistry: MeterRegistry
41 ) :
42     BlueprintMessageProducerService {
43
44     private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!!
45
46     private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
47
48     private val messageLoggerService = MessageLoggerService()
49
50     companion object {
51
52         const val MAX_ERR_MSG_LEN = 128
53     }
54
55     override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean {
56         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
57         return sendMessageNB(key, messageProducerProperties.topic!!, message, headers)
58     }
59
60     override suspend fun sendMessageNB(
61         key: String,
62         topic: String,
63         message: Any,
64         headers: MutableMap<String, String>?
65     ): Boolean {
66         var clonedMessage = message
67         if (clonedMessage is ExecutionServiceOutput) {
68             clonedMessage = truncateResponse(clonedMessage)
69         }
70
71         val byteArrayMessage = when (clonedMessage) {
72             is String -> clonedMessage.toByteArray(Charset.defaultCharset())
73             else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
74         }
75
76         val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage)
77         val recordHeaders = record.headers()
78         messageLoggerService.messageProducing(recordHeaders)
79         headers?.let {
80             headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
81         }
82         val callback = Callback { metadata, exception ->
83             meterRegistry.counter(
84                 BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER,
85                 BlueprintMessageUtils.kafkaMetricTag(topic)
86             ).increment()
87             if (exception != null) {
88                 meterRegistry.counter(
89                     BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
90                     BlueprintMessageUtils.kafkaMetricTag(topic)
91                 ).increment()
92                 log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
93             } else {
94                 val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
95                     "partition(${metadata.partition()}) " +
96                     "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
97                 log.info(message)
98             }
99         }
100         messageTemplate().send(record, callback)
101         return true
102     }
103
104     fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
105         log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
106         val configProps = messageProducerProperties.getConfig()
107
108         /** Add additional Properties */
109         if (additionalConfig != null)
110             configProps.putAll(additionalConfig)
111
112         if (kafkaProducer == null)
113             kafkaProducer = KafkaProducer(configProps)
114
115         return kafkaProducer!!
116     }
117
118     /**
119      * Truncation of BP responses
120      */
121     private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput {
122         /** Truncation of error messages */
123         var truncErrMsg = executionServiceOutput.status.errorMessage
124         if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
125             truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) +
126                 " [...]. Check Blueprint Processor logs for more information."
127         }
128         /** Truncation for Command Executor responses */
129         var truncPayload = executionServiceOutput.payload.deepCopy()
130         val workflowName = executionServiceOutput.actionIdentifiers.actionName
131         if (truncPayload.path("$workflowName-response").has("execute-command-logs")) {
132             var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode
133             cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive())
134         }
135         return ExecutionServiceOutput().apply {
136             correlationUUID = executionServiceOutput.correlationUUID
137             commonHeader = executionServiceOutput.commonHeader
138             actionIdentifiers = executionServiceOutput.actionIdentifiers
139             status = Status().apply {
140                 code = executionServiceOutput.status.code
141                 eventType = executionServiceOutput.status.eventType
142                 timestamp = executionServiceOutput.status.timestamp
143                 errorMessage = truncErrMsg
144                 message = executionServiceOutput.status.message
145             }
146             payload = truncPayload
147             stepData = executionServiceOutput.stepData
148         }
149     }
150
151     private fun getMessageLogData(message: Any): String {
152         return when (message) {
153             is ExecutionServiceInput -> {
154                 val actionIdentifiers = message.actionIdentifiers
155                 "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
156             }
157             is ExecutionServiceOutput -> {
158                 val actionIdentifiers = message.actionIdentifiers
159                 "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
160             }
161             else -> "message($message)"
162         }
163     }
164 }