2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19 import org.apache.kafka.clients.consumer.ConsumerRecord
20 import org.apache.kafka.common.header.Headers
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
24 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
25 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
26 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import java.net.InetAddress
30 import java.time.Instant
31 import java.time.ZoneOffset
32 import java.time.ZonedDateTime
33 import java.time.format.DateTimeFormatter
36 class MessageLoggerService {
38 private val log = logger(MessageLoggerService::class)
40 fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
42 headers.requestId, headers.subRequestId,
43 headers.originatorId, consumerRecord
47 fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
48 val headers = consumerRecord.headers().toMap()
49 val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID()
50 val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID()
51 val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN"
52 messageConsuming(requestID, invocationID, partnerName, consumerRecord)
59 consumerRecord: ConsumerRecord<*, *>
61 val headers = consumerRecord.headers().toMap()
62 val localhost = InetAddress.getLocalHost()
66 .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
67 .format(DateTimeFormatter.ISO_INSTANT)
69 MDC.put("RequestID", requestID)
70 MDC.put("InvocationID", invocationID)
71 MDC.put("PartnerName", partnerName)
72 MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty())
73 MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
74 MDC.put("ServiceName", consumerRecord.topic())
75 // Custom MDC for Message Consumers
76 MDC.put("Offset", consumerRecord.offset().toString())
77 MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty())
78 log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}")
81 /** Used before producing message request, Inbound Invocation ID is used as request Id
82 * for produced message Request, If invocation Id is missing then default Request Id will be generated.
84 fun messageProducing(requestHeader: Headers) {
85 val localhost = InetAddress.getLocalHost()
86 requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
87 requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString())
88 requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, BluePrintConstants.APP_NAME)
89 requestHeader.addHeader("ClientIPAddress", localhost.hostAddress)
92 fun messageConsumingExisting() {