From 0d3a0223fd11d431497519f3f9da640aafe00460 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Mon, 28 Oct 2019 16:46:47 -0400 Subject: [PATCH] Add Message tracing logger service. Issue-ID: CCSDK-1046 Signed-off-by: Brinda Santh Change-Id: I509df82ec558cd96934043a5b41ea53aa040cc81 --- .../grpc/service/GrpcLoggerService.kt | 9 ++- .../message/BluePrintMessageExtensions.kt | 32 ++++++++ .../KafkaBasicAuthMessageConsumerService.kt | 4 +- .../KafkaBasicAuthMessageProducerService.kt | 7 +- .../message/service/MessageLoggerService.kt | 88 ++++++++++++++++++++++ .../message/service/MessageLoggerServiceTest.kt | 61 +++++++++++++++ .../src/test/resources/logback-test.xml | 10 ++- .../rest/service/RestLoggerService.kt | 4 +- 8 files changed, 208 insertions(+), 7 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt index 1b932480a..6d2ba43cc 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt @@ -31,6 +31,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.slf4j.MDC +import java.net.InetAddress import java.net.InetSocketAddress import java.time.ZoneOffset import java.time.ZonedDateTime @@ -46,7 +47,7 @@ class GrpcLoggerService { headers: Metadata, next: ServerCallHandler) { val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID() val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID() - val partnerName = headers.getStringKey(ONAP_PARTNER_NAME).defaultToUUID() + val partnerName = headers.getStringKey(ONAP_PARTNER_NAME) ?: "UNKNOWN" grpcRequesting(requestID, invocationID, partnerName, call) } @@ -54,12 +55,14 @@ class GrpcLoggerService { headers: CommonHeader, next: ServerCallHandler) { val requestID = headers.requestId.defaultToUUID() val invocationID = headers.subRequestId.defaultToUUID() - val partnerName = headers.originatorId.defaultToUUID() + val partnerName = headers.originatorId ?: "UNKNOWN" grpcRequesting(requestID, invocationID, partnerName, call) } fun grpcRequesting(requestID: String, invocationID: String, partnerName: String, call: ServerCall) { + val localhost = InetAddress.getLocalHost() + val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress ?: throw BluePrintProcessorException("failed to get client address") val serviceName = call.methodDescriptor.fullMethodName @@ -69,7 +72,7 @@ class GrpcLoggerService { MDC.put("InvocationID", invocationID) MDC.put("PartnerName", partnerName) MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty()) - MDC.put("ServerFQDN", clientSocketAddress.address.hostName.defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) MDC.put("ServiceName", serviceName) log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}") } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt new file mode 100644 index 000000000..a817c0c74 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeader +import java.nio.charset.Charset + + +fun T?.toMap(): MutableMap { + val map: MutableMap = hashMapOf() + this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } + return map +} + +fun Headers.addHeader(key: String, value: String) { + this.add(RecordHeader(key, value.toByteArray())) +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index b99be0ae5..a4ccfa901 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,9 +37,10 @@ open class KafkaBasicAuthMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) : BlueprintMessageConsumerService { + val log = logger(KafkaBasicAuthMessageConsumerService::class) + val channel = Channel() var kafkaConsumer: Consumer? = null - val log = logger(KafkaBasicAuthMessageConsumerService::class) @Volatile var keepGoing = true diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 86c04f6be..42adcd712 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +39,8 @@ class KafkaBasicAuthMessageProducerService( private var kafkaProducer: KafkaProducer? = null + private val messageLoggerService = MessageLoggerService() + override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } return sendMessageNB(messageProducerProperties.topic!!, message) @@ -56,8 +59,10 @@ class KafkaBasicAuthMessageProducerService( } val record = ProducerRecord(topic, defaultToUUID(), byteArrayMessage) + val recordHeaders = record.headers() + messageLoggerService.messageProducing(recordHeaders) headers?.let { - headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) } + headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> log.info("message published offset(${metadata.offset()}, headers :$headers )") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt new file mode 100644 index 000000000..21bf1b76c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -0,0 +1,88 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Headers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetAddress +import java.time.Instant +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class MessageLoggerService { + + private val log = logger(MessageLoggerService::class) + + fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) { + messageConsuming(headers.requestId, headers.subRequestId, + headers.originatorId, consumerRecord) + } + + fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID() + val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID() + val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN" + messageConsuming(requestID, invocationID, partnerName, consumerRecord) + } + + + fun messageConsuming(requestID: String, invocationID: String, partnerName: String, + consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val localhost = InetAddress.getLocalHost() + MDC.put("InvokeTimestamp", ZonedDateTime + .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC) + .format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) + MDC.put("ServiceName", consumerRecord.topic()) + // Custom MDC for Message Consumers + MDC.put("Offset", consumerRecord.offset().toString()) + MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty()) + log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + /** Used before producing message request, Inbound Invocation ID is used as request Id + * for produced message Request, If invocation Id is missing then default Request Id will be generated. + */ + fun messageProducing(requestHeader: Headers) { + val localhost = InetAddress.getLocalHost() + requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName) + requestHeader.addHeader("ClientIPAddress", localhost.hostAddress) + } + + fun messageConsumingExisting() { + MDC.clear() + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt new file mode 100644 index 000000000..82e40efd1 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import io.mockk.every +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.slf4j.MDC +import kotlin.test.assertEquals + +class MessageLoggerServiceTest { + + + @Test + fun testMessagingHeaders() { + val messageLoggerService = MessageLoggerService() + val commonHeader = CommonHeader().apply { + requestId = "1234" + subRequestId = "1234-12" + originatorId = "cds-test" + } + + val consumerRecord = mockk>() + every { consumerRecord.headers() } returns null + every { consumerRecord.key() } returns "1234" + every { consumerRecord.offset() } returns 12345 + every { consumerRecord.topic() } returns "sample-topic" + every { consumerRecord.timestamp() } returns System.currentTimeMillis() + messageLoggerService.messageConsuming(commonHeader, consumerRecord) + assertEquals(commonHeader.requestId, MDC.get("RequestID")) + assertEquals(commonHeader.subRequestId, MDC.get("InvocationID")) + + val mockHeaders = RecordHeaders() + messageLoggerService.messageProducing(mockHeaders) + val map = mockHeaders.toMap() + assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID]) + + messageLoggerService.messageConsumingExisting() + + } + +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 3868440c7..5f4fb4f73 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -1,5 +1,6 @@ + + + + + - %d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n + ${testing} diff --git a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt index 2ef5a31bc..cec11ae3c 100644 --- a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt @@ -32,6 +32,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse import reactor.core.Disposable import reactor.core.publisher.Mono import reactor.core.publisher.MonoSink +import java.net.InetAddress import java.time.ZoneOffset import java.time.ZonedDateTime import java.time.format.DateTimeFormatter @@ -43,6 +44,7 @@ class RestLoggerService { fun entering(request: ServerHttpRequest) { + val localhost = InetAddress.getLocalHost() val headers = request.headers val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID() val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID() @@ -52,7 +54,7 @@ class RestLoggerService { MDC.put("InvocationID", invocationID) MDC.put("PartnerName", partnerName) MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty()) - MDC.put("ServerFQDN", request.remoteAddress?.hostString.defaultToEmpty()) + MDC.put("ServerFQDN",localhost.hostName.defaultToEmpty()) if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) { MDC.put("ServiceName", request.uri.path) } -- 2.16.6