Add Message tracing logger service. 64/97664/1
authorBrinda Santh <bs2796@att.com>
Mon, 28 Oct 2019 20:46:47 +0000 (16:46 -0400)
committerBrinda Santh <bs2796@att.com>
Mon, 28 Oct 2019 20:46:47 +0000 (16:46 -0400)
Issue-ID: CCSDK-1046
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I509df82ec558cd96934043a5b41ea53aa040cc81

ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt

index 1b93248..6d2ba43 100644 (file)
@@ -31,6 +31,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.slf4j.MDC
+import java.net.InetAddress
 import java.net.InetSocketAddress
 import java.time.ZoneOffset
 import java.time.ZonedDateTime
@@ -46,7 +47,7 @@ class GrpcLoggerService {
                                                  headers: Metadata, next: ServerCallHandler<ReqT, RespT>) {
         val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID()
         val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID()
-        val partnerName = headers.getStringKey(ONAP_PARTNER_NAME).defaultToUUID()
+        val partnerName = headers.getStringKey(ONAP_PARTNER_NAME) ?: "UNKNOWN"
         grpcRequesting(requestID, invocationID, partnerName, call)
     }
 
@@ -54,12 +55,14 @@ class GrpcLoggerService {
                                                  headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) {
         val requestID = headers.requestId.defaultToUUID()
         val invocationID = headers.subRequestId.defaultToUUID()
-        val partnerName = headers.originatorId.defaultToUUID()
+        val partnerName = headers.originatorId ?: "UNKNOWN"
         grpcRequesting(requestID, invocationID, partnerName, call)
     }
 
     fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String,
                                                  call: ServerCall<ReqT, RespT>) {
+        val localhost = InetAddress.getLocalHost()
+
         val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress
                 ?: throw BluePrintProcessorException("failed to get client address")
         val serviceName = call.methodDescriptor.fullMethodName
@@ -69,7 +72,7 @@ class GrpcLoggerService {
         MDC.put("InvocationID", invocationID)
         MDC.put("PartnerName", partnerName)
         MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty())
-        MDC.put("ServerFQDN", clientSocketAddress.address.hostName.defaultToEmpty())
+        MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
         MDC.put("ServiceName", serviceName)
         log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}")
     }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
new file mode 100644 (file)
index 0000000..a817c0c
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+import org.apache.kafka.common.header.Headers
+import org.apache.kafka.common.header.internals.RecordHeader
+import java.nio.charset.Charset
+
+
+fun <T : Headers> T?.toMap(): MutableMap<String, String> {
+    val map: MutableMap<String, String> = hashMapOf()
+    this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) }
+    return map
+}
+
+fun Headers.addHeader(key: String, value: String) {
+    this.add(RecordHeader(key, value.toByteArray()))
+}
\ No newline at end of file
index b99be0a..a4ccfa9 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -36,9 +37,10 @@ open class KafkaBasicAuthMessageConsumerService(
         private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
     : BlueprintMessageConsumerService {
 
+    val log = logger(KafkaBasicAuthMessageConsumerService::class)
+
     val channel = Channel<String>()
     var kafkaConsumer: Consumer<String, ByteArray>? = null
-    val log = logger(KafkaBasicAuthMessageConsumerService::class)
 
     @Volatile
     var keepGoing = true
index 86c04f6..42adcd7 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -38,6 +39,8 @@ class KafkaBasicAuthMessageProducerService(
 
     private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
 
+    private val messageLoggerService = MessageLoggerService()
+
     override suspend fun sendMessageNB(message: Any): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
         return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -56,8 +59,10 @@ class KafkaBasicAuthMessageProducerService(
         }
 
         val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+        val recordHeaders = record.headers()
+        messageLoggerService.messageProducing(recordHeaders)
         headers?.let {
-            headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+            headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
         }
         val callback = Callback { metadata, exception ->
             log.info("message published offset(${metadata.offset()}, headers :$headers )")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
new file mode 100644 (file)
index 0000000..21bf1b7
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Headers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetAddress
+import java.time.Instant
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class MessageLoggerService {
+
+    private val log = logger(MessageLoggerService::class)
+
+    fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
+        messageConsuming(headers.requestId, headers.subRequestId,
+                headers.originatorId, consumerRecord)
+    }
+
+    fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
+        val headers = consumerRecord.headers().toMap()
+        val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID()
+        val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID()
+        val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN"
+        messageConsuming(requestID, invocationID, partnerName, consumerRecord)
+    }
+
+
+    fun messageConsuming(requestID: String, invocationID: String, partnerName: String,
+                         consumerRecord: ConsumerRecord<*, *>) {
+        val headers = consumerRecord.headers().toMap()
+        val localhost = InetAddress.getLocalHost()
+        MDC.put("InvokeTimestamp", ZonedDateTime
+                .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
+                .format(DateTimeFormatter.ISO_INSTANT))
+        MDC.put("RequestID", requestID)
+        MDC.put("InvocationID", invocationID)
+        MDC.put("PartnerName", partnerName)
+        MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty())
+        MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
+        MDC.put("ServiceName", consumerRecord.topic())
+        // Custom MDC for Message Consumers
+        MDC.put("Offset", consumerRecord.offset().toString())
+        MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty())
+        log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}")
+    }
+
+    /** Used before producing message request, Inbound Invocation ID is used as request Id
+     * for produced message Request, If invocation Id is missing then default Request Id will be generated.
+     */
+    fun messageProducing(requestHeader: Headers) {
+        val localhost = InetAddress.getLocalHost()
+        requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+        requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+        val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+        requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName)
+        requestHeader.addHeader("ClientIPAddress", localhost.hostAddress)
+    }
+
+    fun messageConsumingExisting() {
+        MDC.clear()
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
new file mode 100644 (file)
index 0000000..82e40ef
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import io.mockk.every
+import io.mockk.mockk
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.RecordHeaders
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.slf4j.MDC
+import kotlin.test.assertEquals
+
+class MessageLoggerServiceTest {
+
+
+    @Test
+    fun testMessagingHeaders() {
+        val messageLoggerService = MessageLoggerService()
+        val commonHeader = CommonHeader().apply {
+            requestId = "1234"
+            subRequestId = "1234-12"
+            originatorId = "cds-test"
+        }
+
+        val consumerRecord = mockk<ConsumerRecord<*, *>>()
+        every { consumerRecord.headers() } returns null
+        every { consumerRecord.key() } returns "1234"
+        every { consumerRecord.offset() } returns 12345
+        every { consumerRecord.topic() } returns "sample-topic"
+        every { consumerRecord.timestamp() } returns System.currentTimeMillis()
+        messageLoggerService.messageConsuming(commonHeader, consumerRecord)
+        assertEquals(commonHeader.requestId, MDC.get("RequestID"))
+        assertEquals(commonHeader.subRequestId, MDC.get("InvocationID"))
+
+        val mockHeaders = RecordHeaders()
+        messageLoggerService.messageProducing(mockHeaders)
+        val map = mockHeaders.toMap()
+        assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID])
+
+        messageLoggerService.messageConsumingExisting()
+
+    }
+
+}
\ No newline at end of file
index 3868440..5f4fb4f 100644 (file)
@@ -1,5 +1,6 @@
 <!--
   ~  Copyright © 2019 IBM.
+  ~  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
   ~
   ~  Licensed under the Apache License, Version 2.0 (the "License");
   ~  you may not use this file except in compliance with the License.
   -->
 
 <configuration>
+
+    <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+    <property name="defaultPattern"
+              value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+    <property name="testing"
+              value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <!-- encoders are assigned the type
              ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
         <encoder>
-            <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
+            <pattern>${testing}</pattern>
         </encoder>
     </appender>
 
index 2ef5a31..cec11ae 100644 (file)
@@ -32,6 +32,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse
 import reactor.core.Disposable
 import reactor.core.publisher.Mono
 import reactor.core.publisher.MonoSink
+import java.net.InetAddress
 import java.time.ZoneOffset
 import java.time.ZonedDateTime
 import java.time.format.DateTimeFormatter
@@ -43,6 +44,7 @@ class RestLoggerService {
 
 
     fun entering(request: ServerHttpRequest) {
+        val localhost = InetAddress.getLocalHost()
         val headers = request.headers
         val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID()
         val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID()
@@ -52,7 +54,7 @@ class RestLoggerService {
         MDC.put("InvocationID", invocationID)
         MDC.put("PartnerName", partnerName)
         MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty())
-        MDC.put("ServerFQDN", request.remoteAddress?.hostString.defaultToEmpty())
+        MDC.put("ServerFQDN",localhost.hostName.defaultToEmpty())
         if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) {
             MDC.put("ServiceName", request.uri.path)
         }