Improve log format consistency 16/129516/1
authorJozsef Csongvai <jozsef.csongvai@bell.ca>
Tue, 7 Jun 2022 23:30:54 +0000 (19:30 -0400)
committerJozsef Csongvai <jozsef.csongvai@bell.ca>
Wed, 8 Jun 2022 00:38:58 +0000 (20:38 -0400)
Logs related to request processing should include: requestId,
subrequestId and originatorId. Each API (rest,grpc,kafka) would
produce different outputs, due to MDC context not being passed
properly between coroutine contexts or values not being populated.

Issue-ID: CCSDK-3686
Signed-off-by: Jozsef Csongvai <jozsef.csongvai@bell.ca>
Change-Id: Ibafdffd3409b9724ad91633ca5840070f7e287f5

ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt

index 732f4fe..987ac44 100644 (file)
@@ -20,6 +20,10 @@ import io.grpc.Grpc
 import io.grpc.Metadata
 import io.grpc.ServerCall
 import io.grpc.ServerCallHandler
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.newCoroutineContext
+import kotlinx.coroutines.withContext
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
@@ -28,9 +32,11 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVO
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
 import org.slf4j.MDC
 import java.net.InetAddress
 import java.net.InetSocketAddress
@@ -108,3 +114,16 @@ class GrpcLoggerService {
         }
     }
 }
+suspend fun <T> mdcGrpcCoroutineScope(
+    executionServiceInput: ExecutionServiceInput,
+    block: suspend CoroutineScope.() -> T
+) = coroutineScope {
+
+    MDC.put("RequestID", executionServiceInput.commonHeader.requestId)
+    MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId)
+    MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId)
+
+    withContext(newCoroutineContext(this.coroutineContext + MDCContext(MDC.getCopyOfContextMap()))) {
+        block()
+    }
+}
index cccc61f..e506495 100644 (file)
@@ -32,6 +32,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtil
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
 import org.slf4j.LoggerFactory
+import org.slf4j.MDC
 import java.nio.charset.Charset
 
 class KafkaMessageProducerService(
@@ -78,7 +79,9 @@ class KafkaMessageProducerService(
         headers?.let {
             headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
         }
+        val context = MDC.getCopyOfContextMap()
         val callback = Callback { metadata, exception ->
+            MDC.setContextMap(context)
             meterRegistry.counter(
                 BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER,
                 BlueprintMessageUtils.kafkaMetricTag(topic)
index 90b8500..ff2d43f 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.newCoroutineContext
+import kotlinx.coroutines.withContext
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.header.Headers
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
 import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -93,3 +99,17 @@ class MessageLoggerService {
         MDC.clear()
     }
 }
+
+suspend fun <T> mdcKafkaCoroutineScope(
+    executionServiceInput: ExecutionServiceInput,
+    block: suspend CoroutineScope.() -> T
+) = coroutineScope {
+
+    MDC.put("RequestID", executionServiceInput.commonHeader.requestId)
+    MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId)
+    MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId)
+
+    withContext(newCoroutineContext(this.coroutineContext + MDCContext(MDC.getCopyOfContextMap()))) {
+        block()
+    }
+}
index b1d8abd..4b9bbb1 100644 (file)
@@ -126,6 +126,8 @@ suspend fun <T> mdcWebCoroutineScope(
         val mdcContext = if (executionServiceInput != null) {
             // MDC Context with overridden request ID
             MDC.put("RequestID", executionServiceInput.commonHeader.requestId)
+            MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId)
+            MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId)
             MDCContext(MDC.getCopyOfContextMap())
         } else {
             // Default MDC Context
index a210185..c722173 100644 (file)
@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver
 import kotlinx.coroutines.runBlocking
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintCoreConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toJava
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.mdcGrpcCoroutineScope
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
@@ -58,7 +59,9 @@ open class BluePrintProcessingGRPCHandler(
                 try {
                     ph.register()
                     runBlocking {
-                        executionServiceHandler.process(executionServiceInput.toJava(), responseObserver)
+                        mdcGrpcCoroutineScope(executionServiceInput) {
+                            executionServiceHandler.process(executionServiceInput.toJava(), responseObserver)
+                        }
                     }
                 } catch (e: Exception) {
                     if (e is BluePrintProcessorException) handleWithErrorCatalog(e) else handleError(e)
index ed9b4d5..0b5d568 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.mdcKafkaCoroutineScope
 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
@@ -110,16 +111,18 @@ open class BluePrintProcessingKafkaConsumer(
                             val key = message.key() ?: UUID.randomUUID().toString()
                             val value = String(message.value(), Charset.defaultCharset())
                             val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
-                            log.info(
-                                "Consumed Message : topic(${message.topic()}) " +
-                                    "partition(${message.partition()}) " +
-                                    "leaderEpoch(${message.leaderEpoch().get()}) " +
-                                    "offset(${message.offset()}) " +
-                                    "key(${message.key()}) " +
-                                    BlueprintMessageUtils.getMessageLogData(executionServiceInput)
-                            )
-                            val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
-                            blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
+                            mdcKafkaCoroutineScope(executionServiceInput) {
+                                log.info(
+                                    "Consumed Message : topic(${message.topic()}) " +
+                                        "partition(${message.partition()}) " +
+                                        "leaderEpoch(${message.leaderEpoch().get()}) " +
+                                        "offset(${message.offset()}) " +
+                                        "key(${message.key()}) " +
+                                        BlueprintMessageUtils.getMessageLogData(executionServiceInput)
+                                )
+                                val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+                                blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
+                            }
                         } catch (e: Exception) {
                             meterRegistry.counter(
                                 BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
index c2c7a60..38d2b90 100644 (file)
@@ -20,8 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 import io.grpc.stub.StreamObserver
 import io.micrometer.core.instrument.MeterRegistry
 import io.micrometer.core.instrument.Timer
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.coroutineScope
 import kotlinx.coroutines.launch
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
@@ -60,17 +59,17 @@ class ExecutionServiceHandler(
     suspend fun process(
         executionServiceInput: ExecutionServiceInput,
         responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
-    ) {
-        when {
-            executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
-                GlobalScope.launch(Dispatchers.Default) {
+    ) = coroutineScope {
+        when (executionServiceInput.actionIdentifiers.mode) {
+            ACTION_MODE_ASYNC -> {
+                launch {
                     val executionServiceOutput = doProcess(executionServiceInput)
                     responseObserver.onNext(executionServiceOutput.toProto())
                     responseObserver.onCompleted()
                 }
                 responseObserver.onNext(response(executionServiceInput).toProto())
             }
-            executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> {
+            ACTION_MODE_SYNC -> {
                 val executionServiceOutput = doProcess(executionServiceInput)
                 responseObserver.onNext(executionServiceOutput.toProto())
                 responseObserver.onCompleted()