2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19 import org.apache.kafka.clients.consumer.ConsumerRecord
20 import org.apache.kafka.common.header.Headers
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
24 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
25 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
26 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import java.net.InetAddress
30 import java.time.Instant
31 import java.time.ZoneOffset
32 import java.time.ZonedDateTime
33 import java.time.format.DateTimeFormatter
36 class MessageLoggerService {
38 private val log = logger(MessageLoggerService::class)
40 fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
41 messageConsuming(headers.requestId, headers.subRequestId,
42 headers.originatorId, consumerRecord)
45 fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
46 val headers = consumerRecord.headers().toMap()
47 val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID()
48 val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID()
49 val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN"
50 messageConsuming(requestID, invocationID, partnerName, consumerRecord)
54 fun messageConsuming(requestID: String, invocationID: String, partnerName: String,
55 consumerRecord: ConsumerRecord<*, *>) {
56 val headers = consumerRecord.headers().toMap()
57 val localhost = InetAddress.getLocalHost()
58 MDC.put("InvokeTimestamp", ZonedDateTime
59 .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
60 .format(DateTimeFormatter.ISO_INSTANT))
61 MDC.put("RequestID", requestID)
62 MDC.put("InvocationID", invocationID)
63 MDC.put("PartnerName", partnerName)
64 MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty())
65 MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
66 MDC.put("ServiceName", consumerRecord.topic())
67 // Custom MDC for Message Consumers
68 MDC.put("Offset", consumerRecord.offset().toString())
69 MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty())
70 log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}")
73 /** Used before producing message request, Inbound Invocation ID is used as request Id
74 * for produced message Request, If invocation Id is missing then default Request Id will be generated.
76 fun messageProducing(requestHeader: Headers) {
77 val localhost = InetAddress.getLocalHost()
78 requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
79 requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString())
80 val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
81 requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName)
82 requestHeader.addHeader("ClientIPAddress", localhost.hostAddress)
85 fun messageConsumingExisting() {