Add all required and reasonable MDCs 03/74603/10
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 13 Dec 2018 08:26:36 +0000 (09:26 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 14 Dec 2018 07:31:49 +0000 (08:31 +0100)
Change-Id: I34beb32a7c53da97c6945ec8d0022ac37059b7c5
Issue-ID: DCAEGEN2-670
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
19 files changed:
build/hv-collector-analysis/src/main/resources/onap-detekt-config.yml
development/docker-compose.yml
pom.xml
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt [new file with mode: 0644]
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt [new file with mode: 0644]
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt [new file with mode: 0644]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt [new file with mode: 0644]

index d4c3f1d..d5d3988 100644 (file)
@@ -94,6 +94,7 @@ services:
     command: ["--listen-port", "6062",
               "--ves-host", "ves-hv-collector",
               "--ves-port", "6061",
+              "--key-store", "/etc/ves-hv/client.p12",
               "--key-store-password", "onaponap",
               "--trust-store-password", "onaponap"]
     depends_on:
diff --git a/pom.xml b/pom.xml
index 621c5b2..d09da50 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
     </modules>
 
     <properties>
-        <kotlin.version>1.3.0</kotlin.version>
+        <kotlin.version>1.3.11</kotlin.version>
         <arrow.version>0.8.0</arrow.version>
         <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
         <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
index cf73aed..ca1605e 100644 (file)
@@ -74,7 +74,7 @@ internal class VesHvCollector(
 
     private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
             .decode(rawPayload)
-            .filterFailedWithLog(logger, clientContext::asMap,
+            .filterFailedWithLog(logger, clientContext::fullMdc,
                     { "Ves event header decoded successfully" },
                     { "Failed to decode ves event header, reason: ${it.message}" })
 
@@ -88,7 +88,7 @@ internal class VesHvCollector(
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
-            .filterEmptyWithLog(logger, clientContext::asMap,
+            .filterEmptyWithLog(logger, clientContext::fullMdc,
                     { "Found route for message: ${it.topic}, partition: ${it.partition}" },
                     { "Could not find route for message" })
 
@@ -96,7 +96,7 @@ internal class VesHvCollector(
             .also { logger.debug { "Released buffer memory after handling message stream" } }
 
     fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
-            filterFailedWithLog(logger, clientContext::asMap, predicate)
+            filterFailedWithLog(logger, clientContext::fullMdc, predicate)
 
     companion object {
         private val logger = Logger(VesHvCollector::class)
index 21b79bb..954de97 100644 (file)
@@ -27,21 +27,21 @@ import reactor.core.publisher.Flux
 
 @Suppress("TooManyFunctions")
 internal object ClientContextLogging {
-    fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
-    fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
-    fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
-    fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
-    fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+    fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block)
+    fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block)
+    fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block)
+    fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block)
+    fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block)
 
-    fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
-    fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
-    fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
-    fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
-    fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+    fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message)
+    fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message)
+    fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message)
+    fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message)
+    fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message)
 
     fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
                                              returnFlux: Flux<T> = Flux.empty()): Flux<T> {
-        return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux)
+        return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux)
     }
 }
 
index bbaa47c..14d511b 100644 (file)
@@ -24,13 +24,16 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.retry.Jitter
 import reactor.retry.Retry
 import java.io.StringReader
 import java.time.Duration
+import java.util.*
 import java.util.concurrent.atomic.AtomicReference
 import javax.json.Json
 import javax.json.JsonObject
@@ -52,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
     private val retry = retrySpec
             .doOnRetry {
-                logger.withWarn { log("Could not get fresh configuration", it.exception()) }
+                logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
                 healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
             }
 
@@ -77,17 +80,26 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
                     .map(::createCollectorConfiguration)
                     .retryWhen(retry)
 
-    private fun askForConfig(): Mono<String> = http.get(url)
+    private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
+        val invocationId = UUID.randomUUID()
+        http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
+    }
 
-    private fun filterDifferentValues(configurationString: String) =
-            hashOf(configurationString).let {
-                if (it == lastConfigurationHash.get()) {
-                    logger.trace { "No change detected in consul configuration" }
-                    Mono.empty()
-                } else {
-                    logger.info { "Obtained new configuration from consul:\n${configurationString}" }
-                    lastConfigurationHash.set(it)
-                    Mono.just(configurationString)
+    private fun filterDifferentValues(configuration: BodyWithInvocationId) =
+            configuration.body.let { configurationString ->
+                hashOf(configurationString).let {
+                    if (it == lastConfigurationHash.get()) {
+                        logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+                            "No change detected in consul configuration"
+                        }
+                        Mono.empty()
+                    } else {
+                        logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+                            "Obtained new configuration from consul:\n${configurationString}"
+                        }
+                        lastConfigurationHash.set(it)
+                        Mono.just(configurationString)
+                    }
                 }
             }
 
@@ -119,5 +131,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
         private const val BACKOFF_INTERVAL_FACTOR = 30L
         private val logger = Logger(ConsulConfigurationProvider::class)
     }
-}
 
+    private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
+}
index 3fefc6e..51f7410 100644 (file)
@@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import io.netty.handler.codec.http.HttpStatusClass
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
 import org.slf4j.LoggerFactory
 import reactor.core.publisher.Mono
 import reactor.netty.http.client.HttpClient
+import java.util.*
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -31,21 +33,23 @@ import reactor.netty.http.client.HttpClient
  */
 open class HttpAdapter(private val httpClient: HttpClient) {
 
-    open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
-            .get()
-            .uri(url + createQueryString(queryParams))
-            .responseSingle { response, content ->
-                if (response.status().codeClass() == HttpStatusClass.SUCCESS)
-                    content.asString()
-                else {
-                    val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
-                    Mono.error(IllegalStateException(errorMessage))
-                }
-            }
-            .doOnError {
-                logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
-                logger.withDebug { log("Nested exception:", it) }
-            }
+    open fun get(url: String, invocationId: UUID, queryParams: Map<String, Any> = emptyMap()): Mono<String> =
+            httpClient
+                    .headers { it[INVOCATION_ID_HEADER] = invocationId.toString() }
+                    .get()
+                    .uri(url + createQueryString(queryParams))
+                    .responseSingle { response, content ->
+                        if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+                            content.asString()
+                        else {
+                            val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
+                            Mono.error(IllegalStateException(errorMessage))
+                        }
+                    }
+                    .doOnError {
+                        logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+                        logger.withDebug { log("Nested exception:", it) }
+                    }
 
     private fun createQueryString(params: Map<String, Any>): String {
         if (params.isEmpty())
@@ -65,8 +69,7 @@ open class HttpAdapter(private val httpClient: HttpClient) {
     }
 
     companion object {
-
-
         private val logger = Logger(HttpAdapter::class)
+        const val INVOCATION_ID_HEADER = "X-${OnapMdc.INVOCATION_ID}"
     }
 }
index 690a7d1..b4f9a90 100644 (file)
@@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
     }
 
     private fun logSentMessage(sentMsg: RoutedMessage) {
-        logger.trace(ctx::asMap, Marker.INVOKE) {
+        logger.trace(ctx::fullMdc, Marker.Invoke()) {
             val msgNum = sentMessages.incrementAndGet()
             "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
         }
index 6f02d43..d8d786b 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
+import arrow.core.None
+import arrow.core.Option
 import arrow.core.getOrElse
+import arrow.core.toOption
 import arrow.effects.IO
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,6 +45,10 @@ import reactor.netty.NettyInbound
 import reactor.netty.NettyOutbound
 import reactor.netty.tcp.TcpServer
 import java.time.Duration
+import java.lang.Exception
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -72,17 +81,21 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
         val clientContext = ClientContext(nettyOutbound.alloc())
         nettyInbound.withConnection {
-            clientContext.clientAddress = it.address()
+            populateClientContext(clientContext, it)
+            it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
+                sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+            }
+
         }
 
-        logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
+        logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
         return collectorProvider(clientContext).fold(
                 {
-                    logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+                    logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
                     Mono.empty()
                 },
                 {
-                    logger.info(clientContext::asMap) { "Handling new connection" }
+                    logger.info(clientContext::fullMdc) { "Handling new connection" }
                     nettyInbound.withConnection { conn ->
                         conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
                                 .logConnectionClosed(clientContext)
@@ -92,6 +105,29 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
         )
     }
 
+    private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
+        clientContext.clientAddress = try {
+            Option.fromNullable(connection.address().address)
+        } catch (ex: Exception) {
+            None
+        }
+        clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
+    }
+
+    private fun getSslSession(connection: Connection) = Option.fromNullable(
+            connection
+                    .channel()
+                    .pipeline()
+                    .get(SslHandler::class.java)
+                    ?.engine()
+                    ?.session)
+
+    private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
+            sslSession
+                    .peerCertificates
+                    .firstOption()
+                    .flatMap { Option.fromNullable(it as? X509Certificate) }
+
     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
             .receive()
             .retain()
@@ -108,7 +144,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
 
     private fun Connection.disconnectClient(ctx: ClientContext) {
         channel().close().addListener {
-            logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
+            logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
             if (it.isSuccess)
                 logger.debug(ctx) { "Channel closed successfully." }
             else
@@ -119,7 +155,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
     private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
         onTerminate().subscribe {
             // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
-            logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
+            logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
         }
         return this
     }
index 305e4cb..7b082e6 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.model
 
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
 import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.security.cert.X509Certificate
 import java.util.*
 
 /**
@@ -31,13 +34,28 @@ import java.util.*
  */
 data class ClientContext(
         val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
-        val clientId: String = UUID.randomUUID().toString(),
-        var clientAddress: InetSocketAddress? = null) {
-    fun asMap(): Map<String, String> {
-        val result = mutableMapOf("clientId" to clientId)
-        if (clientAddress != null) {
-            result["clientAddress"] = clientAddress.toString()
-        }
-        return result
+        var clientAddress: Option<InetAddress> = None,
+        var clientCert: Option<X509Certificate> = None,
+        val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP
+        val invocationId: String = UUID.randomUUID().toString()) {
+
+    val mdc: Map<String, String>
+        get() = mapOf(
+                OnapMdc.REQUEST_ID to requestId,
+                OnapMdc.INVOCATION_ID to invocationId,
+                OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE,
+                OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE },
+                OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE }
+        )
+
+    val fullMdc: Map<String, String>
+        get() = mdc + ServiceContext.mdc
+
+    private fun clientDn(): Option<String> = clientCert.map { it.subjectX500Principal.toString() }
+    private fun clientIp(): Option<String> = clientAddress.map(InetAddress::getHostAddress)
+
+    companion object {
+        const val DEFAULT_STATUS_CODE = "INPROGRESS"
+        const val DEFAULT_VALUE = ""
     }
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
new file mode 100644 (file)
index 0000000..2407ece
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.net.UnknownHostException
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object ServiceContext {
+    val instanceId = UUID.randomUUID().toString()
+    val serverFqdn = getHost().hostName
+
+    val mdc = mapOf(
+            OnapMdc.INSTANCE_ID to instanceId,
+            OnapMdc.SERVER_FQDN to serverFqdn
+    )
+
+    private fun getHost() = try {
+        InetAddress.getLocalHost()
+    } catch (ex: UnknownHostException) {
+        InetAddress.getLoopbackAddress()
+    }
+}
index cdee92c..605e7a6 100644 (file)
@@ -19,9 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import arrow.core.Option
 import arrow.core.Try
-import arrow.core.success
 import com.google.protobuf.ByteString
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
@@ -32,7 +30,6 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import reactor.test.test
 import java.nio.charset.Charset
 import kotlin.test.assertTrue
 import kotlin.test.fail
index c6364f7..9ce0c3d 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
+import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.eq
 import com.nhaarman.mockitokotlin2.mock
 import com.nhaarman.mockitokotlin2.whenever
@@ -57,7 +58,7 @@ internal object ConsulConfigurationProviderTest : Spek({
             val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
 
             on("call to consul") {
-                whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+                whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
                         .thenReturn(Mono.just(constructConsulResponse()))
 
                 it("should use received configuration") {
@@ -97,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({
             )
 
             on("call to consul") {
-                whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+                whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
                         .thenReturn(Mono.error(RuntimeException("Test exception")))
 
                 it("should interrupt the flux") {
index 91457fa..f55b1fd 100644 (file)
@@ -23,11 +23,13 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.impl.adapters.HttpAdapter.Companion.INVOCATION_ID_HEADER
 import reactor.core.publisher.Mono
 import reactor.netty.http.client.HttpClient
 import reactor.netty.http.server.HttpServer
 import reactor.test.StepVerifier
 import reactor.test.test
+import java.util.*
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -42,6 +44,9 @@ internal object HttpAdapterTest : Spek({
                     routes.get("/url") { req, resp ->
                         resp.sendString(Mono.just(req.uri()))
                     }
+                    routes.get("/inv-id") { req, resp ->
+                        resp.sendString(Mono.just(req.requestHeaders()[INVOCATION_ID_HEADER]))
+                    }
                 }
                 .bindNow()
         val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
@@ -53,31 +58,46 @@ internal object HttpAdapterTest : Spek({
 
         given("url without query params") {
             val url = "/url"
+            val invocationId = UUID.randomUUID()
 
             it("should not append query string") {
-                httpAdapter.get(url).test()
+                httpAdapter.get(url, invocationId).test()
                         .expectNext(url)
                         .verifyComplete()
             }
+
+            it("should pass invocation id") {
+                httpAdapter.get("/inv-id", invocationId).test()
+                        .expectNext(invocationId.toString())
+                        .verifyComplete()
+            }
         }
 
         given("url with query params") {
             val queryParams = mapOf(Pair("p", "the-value"))
             val url = "/url"
+            val invocationId = UUID.randomUUID()
 
             it("should add them as query string to the url") {
-                httpAdapter.get(url, queryParams).test()
+                httpAdapter.get(url, invocationId, queryParams).test()
                         .expectNext("/url?p=the-value")
                         .verifyComplete()
             }
+
+            it("should pass invocation id") {
+                httpAdapter.get("/inv-id", invocationId, queryParams).test()
+                        .expectNext(invocationId.toString())
+                        .verifyComplete()
+            }
         }
 
         given("invalid url") {
             val invalidUrl = "/wtf"
+            val invocationId = UUID.randomUUID()
 
             it("should interrupt the flux") {
                 StepVerifier
-                        .create(httpAdapter.get(invalidUrl))
+                        .create(httpAdapter.get(invalidUrl, invocationId))
                         .verifyError()
             }
         }
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt
new file mode 100644 (file)
index 0000000..a49428a
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.Inet4Address
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.security.cert.X509Certificate
+import java.util.*
+import javax.security.auth.x500.X500Principal
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object ClientContextTest : Spek({
+    describe("ClientContext") {
+        given("default instance") {
+            val cut = ClientContext()
+
+            on("mapped diagnostic context") {
+                val mdc = cut.mdc
+
+                it("should contain ${OnapMdc.REQUEST_ID}") {
+                    assertThat(mdc[OnapMdc.REQUEST_ID]).isEqualTo(cut.requestId)
+                }
+
+                it("should contain ${OnapMdc.INVOCATION_ID}") {
+                    assertThat(mdc[OnapMdc.INVOCATION_ID]).isEqualTo(cut.invocationId)
+                }
+
+                it("should contain ${OnapMdc.STATUS_CODE}") {
+                    assertThat(mdc[OnapMdc.STATUS_CODE]).isEqualTo("INPROGRESS")
+                }
+
+                it("should contain ${OnapMdc.CLIENT_NAME}") {
+                    assertThat(mdc[OnapMdc.CLIENT_NAME]).isBlank()
+                }
+
+                it("should contain ${OnapMdc.CLIENT_IP}") {
+                    assertThat(mdc[OnapMdc.CLIENT_IP]).isBlank()
+                }
+            }
+        }
+
+        given("instance with client data") {
+            val clientDn = "C=PL, O=Nokia, CN=NokiaBTS"
+            val clientIp = "192.168.52.34"
+            val cert: X509Certificate = mock()
+            val principal: X500Principal = mock()
+            val cut = ClientContext(
+                    clientAddress = Some(InetAddress.getByName(clientIp)),
+                    clientCert = Some(cert))
+
+            whenever(cert.subjectX500Principal).thenReturn(principal)
+            whenever(principal.toString()).thenReturn(clientDn)
+
+            on("mapped diagnostic context") {
+                val mdc = cut.mdc
+
+                it("should contain ${OnapMdc.CLIENT_NAME}") {
+                    assertThat(mdc[OnapMdc.CLIENT_NAME]).isEqualTo(clientDn)
+                }
+
+                it("should contain ${OnapMdc.CLIENT_IP}") {
+                    assertThat(mdc[OnapMdc.CLIENT_IP]).isEqualTo(clientIp)
+                }
+            }
+        }
+    }
+})
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt
new file mode 100644 (file)
index 0000000..5b6e452
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object ServiceContextTest : Spek({
+    describe("ServiceContext") {
+        given("singleton instance") {
+            val cut = ServiceContext
+
+            on("instanceId") {
+                val instanceId = cut.instanceId
+                it("should be valid UUID") {
+                    UUID.fromString(instanceId) // should not throw
+                }
+            }
+
+            on("serverFqdn") {
+                val serverFqdn = cut.serverFqdn
+                it("should be non empty") {
+                    assertThat(serverFqdn).isNotBlank()
+                }
+            }
+
+            on("mapped diagnostic context") {
+                val mdc = cut.mdc
+
+                it("should contain ${OnapMdc.INSTANCE_ID}") {
+                    assertThat(mdc[OnapMdc.INSTANCE_ID]).isEqualTo(cut.instanceId)
+                }
+
+                it("should contain ${OnapMdc.SERVER_FQDN}") {
+                    assertThat(mdc[OnapMdc.SERVER_FQDN]).isEqualTo(cut.serverFqdn)
+                }
+            }
+        }
+    }
+})
index 1e5c9c5..938ba79 100644 (file)
@@ -127,6 +127,7 @@ abstract class AtLevelLogger {
     abstract fun log(message: String)
     abstract fun log(message: String, t: Throwable)
     abstract fun log(marker: Marker, message: String)
+
     open val enabled: Boolean
         get() = true
 
@@ -140,6 +141,19 @@ abstract class AtLevelLogger {
             }
         }
     }
+
+    protected fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) {
+        if (mdc.isEmpty()) {
+            block()
+        } else {
+            try {
+                mdc.forEach(MDC::put)
+                block()
+            } finally {
+                mdc.keys.forEach(MDC::remove)
+            }
+        }
+    }
 }
 
 object OffLevelLogger : AtLevelLogger() {
@@ -168,9 +182,10 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         logger.error(message, t)
     }
 
-    override fun log(marker: Marker, message: String) {
-        logger.error(marker(), message)
-    }
+    override fun log(marker: Marker, message: String) =
+            withAdditionalMdc(marker.mdc) {
+                logger.error(marker.slf4jMarker, message)
+            }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -183,9 +198,10 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         logger.warn(message, t)
     }
 
-    override fun log(marker: Marker, message: String) {
-        logger.warn(marker(), message)
-    }
+    override fun log(marker: Marker, message: String) =
+            withAdditionalMdc(marker.mdc) {
+                logger.warn(marker.slf4jMarker, message)
+            }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -198,9 +214,10 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         logger.info(message, t)
     }
 
-    override fun log(marker: Marker, message: String) {
-        logger.info(marker(), message)
-    }
+    override fun log(marker: Marker, message: String) =
+            withAdditionalMdc(marker.mdc) {
+                logger.info(marker.slf4jMarker, message)
+            }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -213,9 +230,10 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         logger.debug(message, t)
     }
 
-    override fun log(marker: Marker, message: String) {
-        logger.debug(marker(), message)
-    }
+    override fun log(marker: Marker, message: String) =
+            withAdditionalMdc(marker.mdc) {
+                logger.debug(marker.slf4jMarker, message)
+            }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -228,7 +246,8 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         logger.trace(message, t)
     }
 
-    override fun log(marker: Marker, message: String) {
-        logger.trace(marker(), message)
-    }
+    override fun log(marker: Marker, message: String) =
+            withAdditionalMdc(marker.mdc) {
+                logger.trace(marker.slf4jMarker, message)
+            }
 }
index 83fb9a5..9023528 100644 (file)
 package org.onap.dcae.collectors.veshv.utils.logging
 
 import org.slf4j.MarkerFactory
+import java.time.Instant
+import java.util.*
 
-enum class Marker(private val marker: org.slf4j.Marker) {
-    ENTRY(MarkerFactory.getMarker("ENTRY")),
-    EXIT(MarkerFactory.getMarker("EXIT")),
-    INVOKE(MarkerFactory.getMarker("INVOKE"));
+sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<String, String> = emptyMap()) {
 
-    operator fun invoke() = marker
+    object Entry : Marker(ENTRY)
+    object Exit : Marker(EXIT)
+
+    class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) {
+        companion object {
+            private fun mdc(id: UUID, timestamp: Instant) = mapOf(
+                    OnapMdc.INVOCATION_ID to id.toString(),
+                    OnapMdc.INVOCATION_TIMESTAMP to timestamp.toString()
+            )
+        }
+    }
+
+    companion object {
+        private val ENTRY = MarkerFactory.getMarker("ENTRY")
+        private val EXIT = MarkerFactory.getMarker("EXIT")
+        private val INVOKE = MarkerFactory.getMarker("INVOKE")
+    }
 }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt
new file mode 100644 (file)
index 0000000..8658416
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object OnapMdc {
+    const val REQUEST_ID = "RequestID"
+    const val CLIENT_NAME = "PartnerName"
+    const val CLIENT_IP = "ClientIPAddress"
+    const val INVOCATION_ID = "InvocationID"
+    const val INVOCATION_TIMESTAMP = "InvokeTimestamp"
+    const val STATUS_CODE = "StatusCode"
+    const val INSTANCE_ID = "InstanceID"
+    const val SERVER_FQDN = "ServerFQDN"
+}