Add GRPC log tracing service. 31/97331/2
authorBrinda Santh <bs2796@att.com>
Fri, 18 Oct 2019 19:23:36 +0000 (15:23 -0400)
committerBrinda Santh <bs2796@att.com>
Tue, 22 Oct 2019 17:03:11 +0000 (13:03 -0400)
Issue-ID: CCSDK-1046
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I4ba6ed11d8fb63c21b9c49774ed733cca05c5646

18 files changed:
components/model-catalog/blueprint-model/test-blueprint/capability_cli/Scripts/kotlin/cba/capability/cli/CapabilityCli.kt
ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintGRPCServer.kt
ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt
ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt [deleted file]
ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt [moved from ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt with 53% similarity]
ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt
ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/MDCContextTest.kt

index 9552b61..7bda628 100644 (file)
@@ -19,6 +19,7 @@ package cba.capability.cli
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractScriptComponentFunction
 import org.onap.ccsdk.cds.blueprintsprocessor.ssh.sshClientService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
index 160a1b1..2d39eaa 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor
 
 import io.grpc.ServerBuilder
 import org.onap.ccsdk.cds.blueprintsprocessor.designer.api.BluePrintManagementGRPCHandler
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor
 import org.onap.ccsdk.cds.blueprintsprocessor.security.BasicAuthServerInterceptor
 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.BluePrintProcessingGRPCHandler
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -44,6 +45,7 @@ open class BlueprintGRPCServer(private val bluePrintProcessingGRPCHandler: BlueP
             log.info("Starting Blueprint Processor GRPC Starting..")
             val server = ServerBuilder
                     .forPort(grpcPort!!)
+                    .intercept(GrpcServerLoggingInterceptor())
                     .intercept(authInterceptor)
                     .addService(bluePrintProcessingGRPCHandler)
                     .addService(bluePrintManagementGRPCHandler)
index 5ed5ff4..68fbf25 100644 (file)
@@ -16,7 +16,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor
 
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.LoggingService
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.RestLoggerService
 import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
 import org.springframework.stereotype.Component
 import org.springframework.web.server.ServerWebExchange
@@ -29,7 +29,7 @@ import reactor.util.context.Context
 open class LoggingWebFilter : WebFilter {
     override fun filter(serverWebExchange: ServerWebExchange, webFilterChain: WebFilterChain): Mono<Void> {
 
-        val loggingService = LoggingService()
+        val loggingService = RestLoggerService()
         loggingService.entering(serverWebExchange.request)
         val filterChain = webFilterChain.filter(serverWebExchange).subscriberContext(
                 Context.of(MDCContext,  MDCContext()))
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt
new file mode 100644 (file)
index 0000000..55cf094
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc
+
+import io.grpc.Metadata
+
+fun Metadata.getStringKey(key: String): String? {
+    return this.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))
+}
+
+fun Metadata.putStringKeyValue(key: String, value: String) {
+    this.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt
new file mode 100644 (file)
index 0000000..f3b14b5
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor
+
+import io.grpc.*
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+class GrpcClientLoggingInterceptor : ClientInterceptor {
+    val log = logger(GrpcClientLoggingInterceptor::class)
+
+    val loggingService = GrpcLoggerService()
+
+    override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>,
+                                             callOptions: CallOptions, channel: Channel): ClientCall<ReqT, RespT> {
+
+        return object : ForwardingClientCall
+        .SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
+
+            override fun start(responseListener: Listener<RespT>, headers: Metadata) {
+                val listener = object : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
+                    override fun onMessage(message: RespT) {
+                        loggingService.grpcInvoking(headers)
+                        super.onMessage(message)
+                    }
+                }
+                super.start(listener, headers)
+            }
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt
new file mode 100644 (file)
index 0000000..e21d5d3
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor
+
+import io.grpc.*
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintDownloadInput
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintRemoveInput
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintUploadInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.slf4j.MDC
+
+class GrpcServerLoggingInterceptor : ServerInterceptor {
+    val log = logger(GrpcServerLoggingInterceptor::class)
+    val loggingService = GrpcLoggerService()
+
+    override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>,
+                                                         requestHeaders: Metadata, next: ServerCallHandler<ReqT, RespT>)
+            : ServerCall.Listener<ReqT> {
+
+        val forwardingServerCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
+            override fun sendHeaders(responseHeaders: Metadata) {
+                loggingService.grpResponding(requestHeaders, responseHeaders)
+                super.sendHeaders(responseHeaders)
+            }
+        }
+
+        return object
+            : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
+                next.startCall(forwardingServerCall, requestHeaders)) {
+
+            override fun onMessage(message: ReqT) {
+                /** Get the requestId, SubRequestId and Originator Id and set in MDS context
+                 *  If you are using other GRPC services, Implement own Logging Interceptors to get tracing.
+                 * */
+                when (message) {
+                    is ExecutionServiceInput -> {
+                        val commonHeader = message.commonHeader
+                                ?: throw BluePrintProcessorException("missing common header in request")
+                        loggingService.grpcRequesting(call, commonHeader, next)
+                    }
+                    is BluePrintUploadInput -> {
+                        val commonHeader = message.commonHeader
+                                ?: throw BluePrintProcessorException("missing common header in request")
+                        loggingService.grpcRequesting(call, commonHeader, next)
+                    }
+                    is BluePrintDownloadInput -> {
+                        val commonHeader = message.commonHeader
+                                ?: throw BluePrintProcessorException("missing common header in request")
+                        loggingService.grpcRequesting(call, commonHeader, next)
+                    }
+                    is BluePrintRemoveInput -> {
+                        val commonHeader = message.commonHeader
+                                ?: throw BluePrintProcessorException("missing common header in request")
+                        loggingService.grpcRequesting(call, commonHeader, next)
+                    }
+                    else -> {
+                        loggingService.grpcRequesting(call, requestHeaders, next)
+                    }
+                }
+                super.onMessage(message)
+            }
+
+            override fun onComplete() {
+                MDC.clear()
+                super.onComplete()
+            }
+
+            override fun onCancel() {
+                MDC.clear()
+                super.onCancel()
+            }
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
new file mode 100644 (file)
index 0000000..1b93248
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
+
+import io.grpc.Grpc
+import io.grpc.Metadata
+import io.grpc.ServerCall
+import io.grpc.ServerCallHandler
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetSocketAddress
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class GrpcLoggerService {
+
+    private val log = logger(GrpcLoggerService::class)
+
+    /** Used when server receives request */
+    fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>,
+                                                 headers: Metadata, next: ServerCallHandler<ReqT, RespT>) {
+        val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID()
+        val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID()
+        val partnerName = headers.getStringKey(ONAP_PARTNER_NAME).defaultToUUID()
+        grpcRequesting(requestID, invocationID, partnerName, call)
+    }
+
+    fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>,
+                                                 headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) {
+        val requestID = headers.requestId.defaultToUUID()
+        val invocationID = headers.subRequestId.defaultToUUID()
+        val partnerName = headers.originatorId.defaultToUUID()
+        grpcRequesting(requestID, invocationID, partnerName, call)
+    }
+
+    fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String,
+                                                 call: ServerCall<ReqT, RespT>) {
+        val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress
+                ?: throw BluePrintProcessorException("failed to get client address")
+        val serviceName = call.methodDescriptor.fullMethodName
+
+        MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
+        MDC.put("RequestID", requestID)
+        MDC.put("InvocationID", invocationID)
+        MDC.put("PartnerName", partnerName)
+        MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty())
+        MDC.put("ServerFQDN", clientSocketAddress.address.hostName.defaultToEmpty())
+        MDC.put("ServiceName", serviceName)
+        log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}")
+    }
+
+
+    /** Used before invoking any GRPC outbound request, Inbound Invocation ID is used as request Id
+     * for outbound Request, If invocation Id is missing then default Request Id will be generated.
+     */
+    fun grpcInvoking(requestHeader: Metadata) {
+        requestHeader.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+        requestHeader.putStringKeyValue(ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+        val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+        requestHeader.putStringKeyValue(ONAP_PARTNER_NAME, partnerName)
+    }
+
+    /** Used when server returns response */
+    fun grpResponding(requestHeaders: Metadata, responseHeaders: Metadata) {
+        try {
+            responseHeaders.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("RequestID").defaultToEmpty())
+            responseHeaders.putStringKeyValue(ONAP_INVOCATION_ID, MDC.get("InvocationID").defaultToEmpty())
+            responseHeaders.putStringKeyValue(ONAP_PARTNER_NAME, MDC.get("PartnerName").defaultToEmpty())
+        } catch (e: Exception) {
+            log.warn("couldn't set grpc response headers", e)
+        }
+    }
+}
\ No newline at end of file
index dbff842..601dc0e 100644 (file)
@@ -21,6 +21,7 @@ import io.grpc.internal.DnsNameResolverProvider
 import io.grpc.internal.PickFirstLoadBalancerProvider
 import io.grpc.netty.NettyChannelBuilder
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor
 
 class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: TokenAuthGrpcClientProperties)
     : BluePrintGrpcClientService {
@@ -30,6 +31,7 @@ class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: Toke
                 .forAddress(tokenAuthGrpcClientProperties.host, tokenAuthGrpcClientProperties.port)
                 .nameResolverFactory(DnsNameResolverProvider())
                 .loadBalancerFactory(PickFirstLoadBalancerProvider())
+                .intercept(GrpcClientLoggingInterceptor())
                 .intercept(TokenAuthClientInterceptor(tokenAuthGrpcClientProperties)).usePlaintext().build()
         return managedChannel
     }
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
deleted file mode 100644 (file)
index cdf6ce1..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.core
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.reactor.ReactorContext
-import kotlinx.coroutines.reactor.asCoroutineContext
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine
-import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
-import reactor.core.publisher.Mono
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.EmptyCoroutineContext
-
-/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
-@UseExperimental(InternalCoroutinesApi::class)
-fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
-                block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
-
-    val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
-            ?: sink.currentContext()).asCoroutineContext()
-    /** Populate MDC context only if present in Reactor Context */
-    val newContext = if (!reactorContext.context.isEmpty
-            && reactorContext.context.hasKey(MDCContext)) {
-        val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
-        GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
-    } else GlobalScope.newCoroutineContext(context + reactorContext)
-
-    val coroutine = MonoMDCCoroutine(newContext, sink)
-    sink.onDispose(coroutine)
-    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
-}
\ No newline at end of file
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+package org.onap.ccsdk.cds.blueprintsprocessor.rest.service
 
-import kotlinx.coroutines.AbstractCoroutine
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.handleCoroutineException
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactor.ReactorContext
+import kotlinx.coroutines.reactor.asCoroutineContext
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.slf4j.MDC
 import org.springframework.http.server.reactive.ServerHttpRequest
 import org.springframework.http.server.reactive.ServerHttpResponse
 import reactor.core.Disposable
+import reactor.core.publisher.Mono
 import reactor.core.publisher.MonoSink
 import java.time.ZoneOffset
 import java.time.ZonedDateTime
 import java.time.format.DateTimeFormatter
-import java.util.*
 import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
 
-class LoggingService {
-    private val log = logger(LoggingService::class)
+class RestLoggerService {
+    private val log = logger(RestLoggerService::class)
 
-    companion object {
-        const val ONAP_REQUEST_ID = "X-ONAP-RequestID"
-        const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID"
-        const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName"
-    }
 
     fun entering(request: ServerHttpRequest) {
         val headers = request.headers
-        val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID))
-        val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID))
-        val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME))
+        val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID()
+        val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID()
+        val partnerName = headers.getFirst(ONAP_PARTNER_NAME).defaultToEmpty()
         MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
         MDC.put("RequestID", requestID)
         MDC.put("InvocationID", invocationID)
         MDC.put("PartnerName", partnerName)
-        MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress))
-        MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString))
+        MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty())
+        MDC.put("ServerFQDN", request.remoteAddress?.hostString.defaultToEmpty())
         if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) {
             MDC.put("ServiceName", request.uri.path)
         }
@@ -62,22 +64,35 @@ class LoggingService {
             val resHeaders = response.headers
             resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID")
             resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID")
+            val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+            resHeaders[ONAP_PARTNER_NAME] = partnerName
         } catch (e: Exception) {
             log.warn("couldn't set response headers", e)
         } finally {
             MDC.clear()
         }
     }
+}
 
-    private fun defaultToEmpty(input: Any?): String {
-        return input?.toString() ?: ""
-    }
 
-    private fun defaultToUUID(input: String?): String {
-        return input ?: UUID.randomUUID().toString()
-    }
-}
+/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
+@UseExperimental(InternalCoroutinesApi::class)
+fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
+                block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
+
+    val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
+            ?: sink.currentContext()).asCoroutineContext()
+    /** Populate MDC context only if present in Reactor Context */
+    val newContext = if (!reactorContext.context.isEmpty
+            && reactorContext.context.hasKey(MDCContext)) {
+        val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
+        GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
+    } else GlobalScope.newCoroutineContext(context + reactorContext)
 
+    val coroutine = MonoMDCCoroutine(newContext, sink)
+    sink.onDispose(coroutine)
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
 
 @InternalCoroutinesApi
 class MonoMDCCoroutine<in T>(
index a6bff70..bf251f6 100644 (file)
@@ -19,9 +19,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.designer.api
 
 import io.swagger.annotations.ApiOperation
 import io.swagger.annotations.ApiParam
-import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc
 import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelSearch
 import org.onap.ccsdk.cds.blueprintsprocessor.designer.api.handler.BluePrintModelHandler
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
 import org.springframework.core.io.Resource
 import org.springframework.http.MediaType
index f14f61e..3456506 100644 (file)
@@ -23,7 +23,7 @@ import io.swagger.annotations.ApiParam
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
index e291aa7..6bffffd 100644 (file)
@@ -18,8 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts
 
 import io.grpc.ServerBuilder
 import io.grpc.stub.StreamObserver
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
@@ -36,8 +40,13 @@ class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintPr
             override fun onNext(executionServiceInput: ExecutionServiceInput) {
                 log.info("Received requestId(${executionServiceInput.commonHeader.requestId})  " +
                         "subRequestId(${executionServiceInput.commonHeader.subRequestId})")
-                responseObserver.onNext(buildNotification(executionServiceInput))
-                responseObserver.onNext(buildResponse(executionServiceInput))
+                runBlocking {
+                    launch(MDCContext()) {
+                        responseObserver.onNext(buildNotification(executionServiceInput))
+                        responseObserver.onNext(buildResponse(executionServiceInput))
+                        log.info("message has sent successfully...")
+                    }
+                }
                 responseObserver.onCompleted()
             }
 
@@ -85,6 +94,7 @@ fun main() {
     try {
         val server = ServerBuilder
                 .forPort(50052)
+                .intercept(GrpcServerLoggingInterceptor())
                 .addService(MockBluePrintProcessingServer())
                 .build()
         server.start()
index 29d24c6..9a5be01 100644 (file)
@@ -16,6 +16,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
 
+import com.google.protobuf.util.JsonFormat
 import io.grpc.inprocess.InProcessChannelBuilder
 import io.grpc.inprocess.InProcessServerBuilder
 import io.grpc.testing.GrpcCleanupRule
@@ -26,6 +27,7 @@ import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.collect
 import org.junit.Rule
 import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
@@ -132,12 +134,17 @@ class StreamingRemoteExecutionServiceTest {
                 .setActionName("SampleScript")
                 .setBlueprintName("sample-cba")
                 .setBlueprintVersion("1.0.0")
+                .setMode(ACTION_MODE_SYNC)
                 .build()
 
+        val jsonContent = """{ "key1" : "value1" }"""
+        val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder
+        JsonFormat.parser().merge(jsonContent, payloadBuilder)
+
         return ExecutionServiceInput.newBuilder()
                 .setCommonHeader(commonHeader)
                 .setActionIdentifiers(actionIdentifier)
-                //.setPayload(payloadBuilder.build())
+                .setPayload(payloadBuilder.build())
                 .build()
 
     }
index afe10b3..8951e1a 100644 (file)
   -->
 
 <configuration>
+
+    <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+    <property name="defaultPattern"
+              value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+    <property name="testing"
+              value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <!-- encoders are assigned the type
              ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
         <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+            <pattern>${testing}</pattern>
         </encoder>
     </appender>
 
index ba5815b..6a616ce 100644 (file)
@@ -24,11 +24,14 @@ package org.onap.ccsdk.cds.controllerblueprints.core
  */
 object BluePrintConstants {
 
-    const val RESPONSE_HEADER_TRANSACTION_ID: String = "X-ONAP-RequestID"
     const val RESPONSE_HEADER_MINOR_VERSION: String = "X-MinorVersion"
     const val RESPONSE_HEADER_PATCH_VERSION: String = "X-PatchVersion"
     const val RESPONSE_HEADER_LATEST_VERSION: String = "X-LatestVersion"
 
+    const val ONAP_REQUEST_ID = "X-ONAP-RequestID"
+    const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID"
+    const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName"
+
     const val STATUS_SUCCESS: String = "success"
     const val STATUS_PROCESSING: String = "processing"
     const val STATUS_FAILURE: String = "failure"
index 1aaf9d8..7aa2fc8 100644 (file)
@@ -24,6 +24,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JsonParserUtils
 import org.slf4j.LoggerFactory
 import org.slf4j.helpers.MessageFormatter
+import java.util.*
 import kotlin.reflect.KClass
 
 /**
@@ -36,6 +37,13 @@ fun <T : Any> logger(clazz: T) = LoggerFactory.getLogger(clazz.javaClass)!!
 
 fun <T : KClass<*>> logger(clazz: T) = LoggerFactory.getLogger(clazz.java)!!
 
+fun <T : Any> T?.defaultToEmpty(): String {
+    return this?.toString() ?: ""
+}
+
+fun <T : Any> T?.defaultToUUID(): String {
+    return this?.toString() ?: UUID.randomUUID().toString()
+}
 
 fun <T : Any> T.bpClone(): T {
     return ObjectUtils.clone(this)
@@ -175,7 +183,7 @@ fun ArrayNode.asListOfString(): List<String> {
 
 fun <T> JsonNode.asType(clazzType: Class<T>): T {
     return JacksonUtils.readValue(this, clazzType)
-        ?: throw BluePrintException("couldn't convert JsonNode of type $clazzType")
+            ?: throw BluePrintException("couldn't convert JsonNode of type $clazzType")
 }
 
 fun JsonNode.asListOfString(): List<String> {
@@ -186,8 +194,7 @@ fun JsonNode.asListOfString(): List<String> {
 fun <T : JsonNode> T?.returnNullIfMissing(): JsonNode? {
     return if (this == null || this is NullNode || this is MissingNode) {
         null
-    }
-    else this
+    } else this
 }
 
 fun <T : JsonNode> T?.isNullOrMissing(): Boolean {
index 2ddb450..6c92d18 100644 (file)
@@ -39,13 +39,13 @@ class MDCContextTest {
 
     @Test
     fun testContextCanBePassedBetweenCoroutines() {
-        MDC.put(BluePrintConstants.RESPONSE_HEADER_TRANSACTION_ID, "12345")
+        MDC.put(BluePrintConstants.ONAP_REQUEST_ID, "12345")
         runBlocking {
             GlobalScope.launch {
-                assertEquals(null, MDC.get(BluePrintConstants.RESPONSE_HEADER_TRANSACTION_ID))
+                assertEquals(null, MDC.get(BluePrintConstants.ONAP_REQUEST_ID))
             }
             launch(MDCContext()) {
-                assertEquals("12345", MDC.get(BluePrintConstants.RESPONSE_HEADER_TRANSACTION_ID),
+                assertEquals("12345", MDC.get(BluePrintConstants.ONAP_REQUEST_ID),
                         "couldn't get request id")
 
                 MDC.put("client_id", "client-1")