Add log diagnostic context 92/74392/6
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 7 Dec 2018 13:41:39 +0000 (14:41 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Mon, 10 Dec 2018 13:46:23 +0000 (14:46 +0100)
As it's not trivial to use MDC directly from logging framework in
reactive application, we need to do some work manually. The approach
proposed is an explicit MDC handling, which means that context is
kept as an object created after establishing client connection. Next,
new instance of HvVesCollector (and its dependencies) is created. Every
object is propagated with ClientContext so it can use it when calling
logger methods.

In the future ClientContext might be used to support other use-cases,
ie. per-topic access control.

As a by-product I had to refactor our Logger wrapper, too. It already
had too many functions and after adding MDC number would be doubled.

Change-Id: I9c5d3f5e1d1be1db66d28d292eb0e1c38d8d0ffe
Issue-ID: DCAEGEN2-671
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
37 files changed:
docker-compose.yml
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt [new file with mode: 0644]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
sources/hv-collector-main/src/main/resources/logback.xml
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt

index 4015b08..d4c3f1d 100644 (file)
@@ -1,45 +1,73 @@
 version: "3.5"
 services:
-  zookeeper:
+
+  #
+  # DMaaP Message Router
+  #
+
+  message-router-zookeeper:
     image: wurstmeister/zookeeper
     ports:
-      - "2181:2181"
+    - "2181:2181"
 
-  kafka:
+  message-router-kafka:
+#    image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
     image: wurstmeister/kafka
     ports:
-      - "9092:9092"
+    - "9092:9092"
     environment:
-      KAFKA_ADVERTISED_HOST_NAME: "kafka"
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
-      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
+      KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181"
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
+      KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092"
+      KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092"
+      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT"
     volumes:
-      - /var/run/docker.sock:/var/run/docker.sock
+    - /var/run/docker.sock:/var/run/docker.sock
+    depends_on:
+    - message-router-zookeeper
+
+
+  #
+  # Consul / CBS
+  #
+
+  consul-server:
+    image: docker.io/consul:1.0.6
+    ports:
+    - "8500:8500"
+    command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"]
+
+  consul-config:
+    image: consul
     depends_on:
-      - zookeeper
+    - consul-server
+    restart: on-failure
+    command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
+                                              "dmaap.kafkaBootstrapServers": "message-router-kafka:9092",
+                                              "collector.routing": [
+                                                {
+                                                  "fromDomain": "perf3gpp",
+                                                  "toTopic": "HV_VES_PERF3GPP"
+                                                }
+                                              ]
+                                            }']
+
 
-  consul:
-      image: progrium/consul
-      ports:
-        - "8500:8500"
-      environment:
-        - CONSUL_BIND_INTERFACE=eth0
-      command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
+  #
+  # DCAE HV VES Collector
+  #
 
   ves-hv-collector:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
-#    build:
-#      context: hv-collector-main
-#      dockerfile: Dockerfile
     ports:
-      - "6060:6060"
-      - "6061:6061/tcp"
+    - "6060:6060"
+    - "6061:6061/tcp"
     entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
                  "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
     command: ["--listen-port", "6061",
               "--health-check-api-port", "6060",
-              "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true",
+              "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
               "--key-store-password", "onaponap",
               "--trust-store-password", "onaponap"]
     healthcheck:
@@ -49,37 +77,36 @@ services:
       retries: 3
       start_period: 20s
     depends_on:
-      - kafka
-      - consul
+    - message-router-kafka
+    - consul-server
     volumes:
-      - ./ssl/:/etc/ves-hv/
+    - ./ssl/:/etc/ves-hv/
+
+
+  #
+  # Simulators
+  #
 
   xnf-simulator:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
-#    build:
-#      context: hv-collector-xnf-simulator
-#      dockerfile: Dockerfile
     ports:
-      - "6062:6062/tcp"
+    - "6062:6062/tcp"
     command: ["--listen-port", "6062",
               "--ves-host", "ves-hv-collector",
               "--ves-port", "6061",
               "--key-store-password", "onaponap",
               "--trust-store-password", "onaponap"]
     depends_on:
-      - ves-hv-collector
+    - ves-hv-collector
     volumes:
       - ./ssl/:/etc/ves-hv/
 
   dcae-app-simulator:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
-#    build:
-#      context: hv-collector-dcae-app-simulator
-#      dockerfile: Dockerfile
     ports:
-      - "6063:6063/tcp"
+    - "6063:6063/tcp"
     command: ["--listen-port", "6063",
-              "--kafka-bootstrap-servers", "kafka:9092",
+              "--kafka-bootstrap-servers", "message-router-kafka:9092",
               "--kafka-topics", "HV_VES_PERF3GPP"]
     depends_on:
-      - kafka
+    - message-router-kafka
index dd0111b..b686b25 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import reactor.core.publisher.Flux
@@ -35,12 +36,12 @@ interface Metrics {
 
 @FunctionalInterface
 interface SinkProvider {
-    operator fun invoke(config: CollectorConfiguration): Sink
+    operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
 
     companion object {
         fun just(sink: Sink): SinkProvider =
                 object : SinkProvider {
-                    override fun invoke(config: CollectorConfiguration): Sink = sink
+                    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
                 }
     }
 }
index 3c85a9b..5584d61 100644 (file)
@@ -23,15 +23,17 @@ import arrow.core.Option
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import java.util.*
 
 interface Collector {
-    fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
+    fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
 }
 
-typealias CollectorProvider = () -> Option<Collector>
+typealias CollectorProvider = (ClientContext) -> Option<Collector>
 
 interface Server {
     fun start(): IO<ServerHandle>
index 5c96e1c..2008fc3 100644 (file)
@@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                        private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val collector: AtomicReference<Collector> = AtomicReference()
+        val config: AtomicReference<CollectorConfiguration> = AtomicReference()
         configuration()
-                .map(this::createVesHvCollector)
                 .doOnNext {
-                    logger.info("Using updated configuration for new connections")
+                    logger.info { "Using updated configuration for new connections" }
                     healthState.changeState(HealthDescription.HEALTHY)
                 }
                 .doOnError {
-                    logger.error("Failed to acquire configuration from consul")
+                    logger.error { "Failed to acquire configuration from consul" }
                     healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
                 }
-                .subscribe(collector::set)
-        return collector::getOption
+                .subscribe(config::set)
+        return { ctx: ClientContext ->
+            config.getOption().map { config -> createVesHvCollector(config, ctx) }
+        }
     }
 
-    private fun createVesHvCollector(config: CollectorConfiguration): Collector {
-        return VesHvCollector(
-                wireChunkDecoderSupplier = { alloc ->
-                    WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
-                },
-                protobufDecoder = VesDecoder(),
-                router = Router(config.routing),
-                sink = sinkProvider(config),
-                metrics = metrics)
-    }
+    private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
+            clientContext = ctx,
+            wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+            protobufDecoder = VesDecoder(),
+            router = Router(config.routing, ctx),
+            sink = sinkProvider(config, ctx),
+            metrics = metrics)
 
     companion object {
         private val logger = Logger(CollectorFactory::class)
index cee658b..0977595 100644 (file)
 package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Option
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.Routing
 import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
-class Router(private val routing: Routing) {
+class Router(private val routing: Routing, private val ctx: ClientContext) {
     fun findDestination(message: VesMessage): Option<RoutedMessage> =
-            routing.routeFor(message.header).map { it(message) }
+            routing.routeFor(message.header).map { it(message) }.also {
+                if (it.isEmpty()) {
+                    logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
+                }
+            }
+
+    companion object {
+        private val logger = Logger(Routing::class)
+    }
 }
index 4176de9..0d07504 100644 (file)
@@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Either
 import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -42,28 +42,27 @@ import reactor.core.publisher.Mono
  * @since May 2018
  */
 internal class VesHvCollector(
-        private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
+        private val clientContext: ClientContext,
+        private val wireChunkDecoder: WireChunkDecoder,
         private val protobufDecoder: VesDecoder,
         private val router: Router,
         private val sink: Sink,
         private val metrics: Metrics) : Collector {
 
-    override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
-            wireChunkDecoderSupplier(alloc).let { wireDecoder ->
-                dataStream
-                        .transform { decodeWireFrame(it, wireDecoder) }
-                        .transform(::filterInvalidWireFrame)
-                        .transform(::decodeProtobufPayload)
-                        .transform(::filterInvalidProtobufMessages)
-                        .transform(::routeMessage)
-                        .onErrorResume { logger.handleReactiveStreamError(it) }
-                        .doFinally { releaseBuffersMemory(wireDecoder) }
-                        .then()
-            }
+    override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+            dataStream
+                    .transform { decodeWireFrame(it) }
+                    .transform(::filterInvalidWireFrame)
+                    .transform(::decodeProtobufPayload)
+                    .transform(::filterInvalidProtobufMessages)
+                    .transform(::routeMessage)
+                    .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) }
+                    .doFinally { releaseBuffersMemory() }
+                    .then()
 
-    private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
+    private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
             .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
-            .concatMap(decoder::decode)
+            .concatMap(wireChunkDecoder::decode)
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
     private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
@@ -75,7 +74,7 @@ internal class VesHvCollector(
 
     private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
             .decode(rawPayload)
-            .filterFailedWithLog(logger,
+            .filterFailedWithLog(logger, clientContext::asMap,
                     { "Ves event header decoded successfully" },
                     { "Failed to decode ves event header, reason: ${it.message}" })
 
@@ -89,15 +88,15 @@ internal class VesHvCollector(
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
-            .filterEmptyWithLog(logger,
+            .filterEmptyWithLog(logger, clientContext::asMap,
                     { "Found route for message: ${it.topic}, partition: ${it.partition}" },
                     { "Could not find route for message" })
 
-    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-            .also { logger.debug("Released buffer memory after handling message stream") }
+    private fun releaseBuffersMemory() = wireChunkDecoder.release()
+            .also { logger.debug { "Released buffer memory after handling message stream" } }
 
     fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
-            filterFailedWithLog(logger, predicate)
+            filterFailedWithLog(logger, clientContext::asMap, predicate)
 
     companion object {
         private val logger = Logger(VesHvCollector::class)
index cea8a7e..bbaa47c 100644 (file)
@@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
     private val retry = retrySpec
             .doOnRetry {
-                logger.warn("Could not get fresh configuration", it.exception())
+                logger.withWarn { log("Could not get fresh configuration", it.exception()) }
                 healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
             }
 
index bdce6f7..3fefc6e 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.impl.adapters
 
 import io.netty.handler.codec.http.HttpStatusClass
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.slf4j.LoggerFactory
 import reactor.core.publisher.Mono
 import reactor.netty.http.client.HttpClient
@@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient
  */
 open class HttpAdapter(private val httpClient: HttpClient) {
 
-    private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
-
     open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
             .get()
             .uri(url + createQueryString(queryParams))
@@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) {
                 }
             }
             .doOnError {
-                logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
-                logger.debug("Nested exception:", it)
+                logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+                logger.withDebug { log("Nested exception:", it) }
             }
 
     private fun createQueryString(params: Map<String, Any>): String {
@@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) {
         return builder.removeSuffix("&").toString()
     }
 
+    companion object {
+
+
+        private val logger = Logger(HttpAdapter::class)
+    }
 }
index 5f4bf35..f6cb018 100644 (file)
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong
  */
 internal class LoggingSinkProvider : SinkProvider {
 
-    override fun invoke(config: CollectorConfiguration): Sink {
+    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
         return object : Sink {
             private val totalMessages = AtomicLong()
             private val totalBytes = AtomicLong()
@@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider {
                 val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
                 val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
                 if (msgs % INFO_LOGGING_FREQ == 0L)
-                    logger.info(logMessageSupplier)
+                    logger.info(ctx, logMessageSupplier)
                 else
-                    logger.trace(logMessageSupplier)
+                    logger.trace(ctx, logMessageSupplier)
             }
 
         }
index c4d6c87..fd08ba3 100644 (file)
@@ -20,6 +20,9 @@
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -35,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
     private val sentMessages = AtomicLong(0)
 
     override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
@@ -45,17 +48,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
                     if (it.isSuccessful()) {
                         Mono.just(it)
                     } else {
-                        logger.warn(it.exception()) { "Failed to send message to Kafka" }
+                        logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
                         Mono.empty<SenderResult<RoutedMessage>>()
                     }
                 }
                 .map { it.correlationMetadata() }
 
-        return if (logger.traceEnabled) {
-            result.doOnNext(::logSentMessage)
-        } else {
-            result
-        }
+        return result.doOnNext(::logSentMessage)
     }
 
     private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -69,7 +68,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
     }
 
     private fun logSentMessage(sentMsg: RoutedMessage) {
-        logger.trace {
+        logger.trace(ctx) {
             val msgNum = sentMessages.incrementAndGet()
             "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
         }
index 1819195..b4f470d 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
@@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions
  * @since June 2018
  */
 internal class KafkaSinkProvider : SinkProvider {
-    override fun invoke(config: CollectorConfiguration): Sink {
-        return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+        return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
     }
 
     private fun constructSenderOptions(config: CollectorConfiguration) =
index 0b2997f..2d29fe9 100644 (file)
@@ -23,6 +23,10 @@ import arrow.core.getOrElse
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
             sslContextFactory
                     .createSslContext(serverConfig.securityConfiguration)
                     .map { sslContext ->
-                        logger.info("Collector configured with SSL enabled")
+                        logger.info { "Collector configured with SSL enabled" }
                         this.secure { b -> b.sslContext(sslContext) }
                     }.getOrElse {
-                        logger.info("Collector configured with SSL disabled")
+                        logger.info { "Collector configured with SSL disabled" }
                         this
                     }
 
-    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
-            collectorProvider().fold(
-                    {
-                        nettyInbound.withConnection { conn ->
-                            logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
-                        }
-                        Mono.empty()
-                    },
-                    {
-                        nettyInbound.withConnection { conn ->
-                            logger.info { "Handling connection from ${conn.address()}" }
-                            conn.configureIdleTimeout(serverConfig.idleTimeout)
-                                    .logConnectionClosed()
-                        }
-                        it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+        val clientContext = ClientContext(nettyOutbound.alloc())
+        nettyInbound.withConnection {
+            clientContext.clientAddress = it.address()
+        }
+
+        return collectorProvider(clientContext).fold(
+                {
+                    logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+                    Mono.empty()
+                },
+                {
+                    logger.info { "Handling new connection" }
+                    nettyInbound.withConnection { conn ->
+                        conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+                                .logConnectionClosed(clientContext)
                     }
-            )
+                    it.handleConnection(createDataStream(nettyInbound))
+                }
+        )
+    }
 
     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
             .receive()
             .retain()
 
-    private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+    private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
         onReadIdle(timeout.toMillis()) {
-            logger.info {
+            logger.info(ctx) {
                 "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
             }
-            disconnectClient()
+            disconnectClient(ctx)
         }
         return this
     }
 
-    private fun Connection.disconnectClient() {
+    private fun Connection.disconnectClient(ctx: ClientContext) {
         channel().close().addListener {
             if (it.isSuccess)
-                logger.debug { "Channel (${address()}) closed successfully." }
+                logger.debug(ctx) { "Channel closed successfully." }
             else
-                logger.warn("Channel close failed", it.cause())
+                logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
         }
     }
 
-    private fun Connection.logConnectionClosed(): Connection {
+    private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
         onTerminate().subscribe {
-            logger.info("Connection from ${address()} has been closed")
+            logger.info(ctx) { "Connection has been closed" }
         }
         return this
     }
index 4a2ef6b..349b078 100644 (file)
@@ -21,12 +21,13 @@ package org.onap.dcae.collectors.veshv.impl.wire
 
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
@@ -38,8 +39,8 @@ import reactor.core.publisher.SynchronousSink
  */
 internal class WireChunkDecoder(
         private val decoder: WireFrameDecoder,
-        alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
-    private val streamBuffer = alloc.compositeBuffer()
+        private val ctx: ClientContext) {
+    private val streamBuffer = ctx.alloc.compositeBuffer()
 
     fun release() {
         streamBuffer.release()
@@ -53,7 +54,7 @@ internal class WireChunkDecoder(
         } else {
             streamBuffer.addComponent(true, byteBuf)
             generateFrames()
-                    .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+                    .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
                     .doFinally { streamBuffer.discardReadComponents() }
         }
     }
@@ -84,15 +85,15 @@ internal class WireChunkDecoder(
     }
 
     private fun logIncomingMessage(wire: ByteBuf) {
-        logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+        logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" }
     }
 
     private fun logDecodedWireMessage(wire: WireFrameMessage) {
-        logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+        logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" }
     }
 
     private fun logEndOfData() {
-        logger.trace { "End of data in current TCP buffer" }
+        logger.trace(ctx) { "End of data in current TCP buffer" }
     }
 
     companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
new file mode 100644 (file)
index 0000000..f14a7f6
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.slf4j.MDC
+import java.net.InetSocketAddress
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class ClientContext(
+        val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
+        val clientId: String = UUID.randomUUID().toString(),
+        var clientAddress: InetSocketAddress? = null) {
+    fun asMap(): Map<String, String> {
+        val result = mutableMapOf("clientId" to clientId)
+        if (clientAddress != null) {
+            result["clientAddress"] = clientAddress.toString()
+        }
+        return result
+    }
+}
+
+object ClientContextLogging {
+    fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
+    fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
+    fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
+    fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
+    fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+
+    fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
+    fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
+    fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
+    fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
+    fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+}
index 437614a..ad97a3f 100644 (file)
@@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader
 data class Routing(val routes: List<Route>) {
 
     fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
-            Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
-                if (it.isEmpty()) {
-                    logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
-                }
-            }
-
-    companion object {
-        private val logger = Logger(Routing::class)
-    }
+            Option.fromNullable(routes.find { it.applies(commonHeader) })
 }
 
 data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
index e8a3123..e419016 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.None
 import arrow.core.Some
+import io.netty.buffer.ByteBufAllocator
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
@@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.model.routing
@@ -56,7 +58,7 @@ object RouterTest : Spek({
                 withFixedPartitioning()
             }
         }.build()
-        val cut = Router(config)
+        val cut = Router(config, ClientContext())
 
         on("message with existing route (rtpm)") {
             val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
index f06a0dc..e0092cf 100644 (file)
@@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import reactor.test.test
 
 /**
@@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
 
     fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
 
-    fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
+    fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc))
 
     fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
         for (bb in byteBuffers) {
index 0897e91..ef4ce96 100644 (file)
@@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({
             )
 
             val fluxes = (1.rangeTo(runs)).map {
-                sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+                sut.collector.handleConnection(generateDataStream(sut.alloc, params))
             }
             val durationMs = measureTimeMillis {
                 Flux.merge(fluxes).then().block(timeout)
@@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({
 
             val durationSec = durationMs / 1000.0
             val throughput = sink.count / durationSec
-            logger.info("Processed $runs connections each containing $numMessages msgs.")
-            logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+            logger.info { "Processed $runs connections each containing $numMessages msgs." }
+            logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
             assertThat(sink.count)
                     .describedAs("should send all events")
                     .isEqualTo(runs * numMessages)
@@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({
 
             val dataStream = generateDataStream(sut.alloc, params)
                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
-            sut.collector.handleConnection(sut.alloc, dataStream)
+            sut.collector.handleConnection(dataStream)
                     .timeout(timeout)
                     .block()
 
-            logger.info("Forwarded ${sink.count} msgs")
+            logger.info { "Forwarded ${sink.count} msgs" }
             assertThat(sink.count)
                     .describedAs("should send up to number of events")
                     .isLessThan(numMessages)
index 0495ced..ce242e0 100644 (file)
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
@@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) {
     private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
-        get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+        get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") }
 
     companion object {
         const val MAX_PAYLOAD_SIZE_BYTES = 1024
@@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) {
 }
 
 fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
-    collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+    collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
     return sink.sentMessages
 }
index 2d81c67..ab59cc2 100644 (file)
@@ -287,7 +287,7 @@ object VesHvSpecification : Spek({
                         .map { vesWireFrameMessage(PERF3GPP) }
 
 
-                sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+                sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
 
                 val messages = sink.sentMessages
                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
index 417183f..f7d94de 100644 (file)
@@ -46,7 +46,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
             throw IllegalArgumentException(message)
         }
 
-        logger.info("Received new configuration. Creating consumer for topics: $topics")
+        logger.info { "Received new configuration. Creating consumer for topics: $topics" }
         consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
     }.fix()
 
index 20c0f59..36f30e6 100644 (file)
@@ -61,13 +61,13 @@ class MessageStreamValidation(
         return messageParams.fold(
                 {
                     logger.warn { "Error while parsing message parameters: ${it::class.qualifiedName} : ${it.message}" }
-                    logger.debug { "Detailed stack trace: ${it}" }
+                    logger.debug { "Detailed stack trace: $it" }
                     throw IllegalArgumentException("Parsing error: " + it.message)
                 },
                 {
                     if (it.isEmpty()) {
                         val message = "Message param list cannot be empty"
-                        logger.warn(message)
+                        logger.warn { message }
                         throw IllegalArgumentException(message)
                     }
                     it
index a6ee112..e54eb35 100644 (file)
@@ -71,15 +71,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                 }
                 .delete("messages") { ctx ->
                     ctx.response.contentType(CONTENT_TEXT)
-                    logger.info("Resetting simulator state")
+                    logger.info { "Resetting simulator state" }
                     ctx.response.sendOrError(simulator.resetState())
                 }
                 .get("messages/all/count") { ctx ->
-                    logger.info("Processing request for count of received messages")
+                    logger.info { "Processing request for count of received messages" }
                     simulator.state().fold(
                             {
                                 ctx.response.status(HttpConstants.STATUS_NOT_FOUND)
-                                logger.warn("Error - number of messages could not be specified")
+                                logger.warn { "Error - number of messages could not be specified" }
                             },
                             {
                                 logger.info { "Returned number of received messages: ${it.messagesCount}" }
@@ -90,7 +90,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                 }
                 .post("messages/all/validate") { ctx ->
                     ctx.request.body.then { body ->
-                        logger.info("Processing request for message validation")
+                        logger.info { "Processing request for message validation" }
                         val response = simulator.validate(body.inputStream)
                                 .map { isValid ->
                                     if (isValid) {
index 06ff4d5..5856f04 100644 (file)
@@ -43,17 +43,17 @@ fun main(args: Array<String>) =
                 .map(::startApp)
                 .unsafeRunEitherSync(
                         { ex ->
-                            logger.error("Failed to start a server", ex)
+                            logger.withError { log("Failed to start a server", ex) }
                             ExitFailure(1)
                         },
                         {
-                            logger.info("Started DCAE-APP Simulator API server")
+                            logger.info { "Started DCAE-APP Simulator API server" }
                         }
                 )
 
 
 private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
-    logger.info("Using configuration: $config")
+    logger.info { "Using configuration: $config" }
     val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
     val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
     return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
index 899f51f..5c9566c 100644 (file)
@@ -40,15 +40,15 @@ fun main(args: Array<String>) =
                 .map(::startAndAwaitServers)
                 .unsafeRunEitherSync(
                         { ex ->
-                            logger.error("Failed to start a server", ex)
+                            logger.withError { log("Failed to start a server", ex) }
                             ExitFailure(1)
                         },
-                        { logger.info("Gentle shutdown") }
+                        { logger.info { "Gentle shutdown" } }
                 )
 
 private fun startAndAwaitServers(config: ServerConfiguration) =
         IO.monad().binding {
-            logger.info("Using configuration: $config")
+            logger.info { "Using configuration: $config" }
             HealthCheckServer.start(config).bind()
             VesServer.start(config).bind()
                     .await().bind()
index 5c6f127..13b0bc7 100644 (file)
@@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 abstract class ServerStarter {
     fun start(config: ServerConfiguration): IO<ServerHandle> =
             startServer(config)
-                    .map { logger.info(serverStartedMessage(it)); it }
+                    .map { logger.info { serverStartedMessage(it) }; it }
 
     protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
     protected abstract fun serverStartedMessage(handle: ServerHandle): String
index bee0dae..674fb2c 100644 (file)
@@ -12,6 +12,7 @@
 %nopexception%50.50logger
 | %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
 | %highlight(%-5level)
+| %mdc{clientId} %mdc{clientAddress}
 | %msg
 | %rootException
 | %thread%n"/>
index d017b31..6ca28a5 100644 (file)
@@ -31,7 +31,7 @@ import java.time.Duration
 private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils")
 
 object Assertions : org.assertj.core.api.Assertions() {
-    fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
+    fun <A, B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
 }
 
 
@@ -42,7 +42,7 @@ fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) {
     while (tryNum <= retries) {
         tryNum++
         try {
-            logger.debug("Try number $tryNum")
+            logger.debug { "Try number $tryNum" }
             action()
             break
         } catch (ex: Throwable) {
index 5a733f2..a25b291 100644 (file)
@@ -51,7 +51,7 @@ fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Resp
 fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) {
     response.attempt().unsafeRunSync().fold(
             { err ->
-                logger.warn("Error occurred. Sending .", err)
+                logger.withWarn { log("Error occurred. Sending .", err) }
                 val message = err.message
                 send(errorResponse(message))
             },
index 033dd5e..2fb4880 100644 (file)
@@ -21,117 +21,171 @@ package org.onap.dcae.collectors.veshv.utils.logging
 
 import kotlin.reflect.KClass
 import org.slf4j.LoggerFactory
+import org.slf4j.MDC
+
+typealias MappedDiagnosticContext = () -> Map<String, String>
 
 @Suppress("TooManyFunctions", "SuboptimalLoggerUsage")
-class Logger(val logger: org.slf4j.Logger) {
+class Logger(logger: org.slf4j.Logger) {
     constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java))
     constructor(name: String) : this(LoggerFactory.getLogger(name))
 
-    //
-    // TRACE
-    //
+    private val errorLogger = if (logger.isErrorEnabled) ErrorLevelLogger(logger) else OffLevelLogger
+    private val warnLogger = if (logger.isWarnEnabled) WarnLevelLogger(logger) else OffLevelLogger
+    private val infoLogger = if (logger.isInfoEnabled) InfoLevelLogger(logger) else OffLevelLogger
+    private val debugLogger = if (logger.isDebugEnabled) DebugLevelLogger(logger) else OffLevelLogger
+    private val traceLogger = if (logger.isTraceEnabled) TraceLevelLogger(logger) else OffLevelLogger
 
-    val traceEnabled: Boolean
-        get() = logger.isTraceEnabled
+    // ERROR
 
-    fun trace(messageProvider: () -> String) {
-        if (logger.isTraceEnabled) {
-            logger.trace(messageProvider())
-        }
-    }
+    fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block()
 
-    //
-    // DEBUG
-    //
+    fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            errorLogger.withMdc(mdc, block)
 
-    fun debug(message: String) {
-        logger.debug(message)
+    fun error(message: () -> String) = errorLogger.run {
+        log(message())
     }
 
-    fun debug(message: String, t: Throwable) {
-        logger.debug(message, t)
+    fun error(mdc: MappedDiagnosticContext, message: () -> String) =
+            errorLogger.withMdc(mdc) { log(message()) }
+
+    // WARN
+
+    fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
+
+    fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            warnLogger.withMdc(mdc, block)
+
+    fun warn(message: () -> String) = warnLogger.run {
+        log(message())
     }
 
-    fun debug(messageProvider: () -> String) {
-        if (logger.isDebugEnabled) {
-            logger.debug(messageProvider())
-        }
+    fun warn(mdc: MappedDiagnosticContext, message: () -> String) =
+            warnLogger.withMdc(mdc) { log(message()) }
+
+
+    // INFO
+
+    fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
+
+    fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            infoLogger.withMdc(mdc, block)
+
+    fun info(message: () -> String) = infoLogger.run {
+        log(message())
     }
 
-    fun debug(t: Throwable, messageProvider: () -> String) {
-        if (logger.isDebugEnabled) {
-            logger.debug(messageProvider(), t)
-        }
+    fun info(mdc: MappedDiagnosticContext, message: () -> String) =
+            infoLogger.withMdc(mdc) { log(message()) }
+
+    // DEBUG
+
+    fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block()
+
+    fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            debugLogger.withMdc(mdc, block)
+
+    fun debug(message: () -> String) = debugLogger.run {
+        log(message())
     }
 
-    //
-    // INFO
-    //
-    fun info(message: String) {
-        logger.info(message)
+    fun debug(mdc: MappedDiagnosticContext, message: () -> String) =
+            debugLogger.withMdc(mdc) { log(message()) }
+
+
+    // TRACE
+
+    fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block()
+
+    fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            traceLogger.withMdc(mdc, block)
+
+    fun trace(message: () -> String) = traceLogger.run {
+        log(message())
     }
 
-    fun info(messageProvider: () -> String) {
-        if (logger.isInfoEnabled) {
-            logger.info(messageProvider())
+    fun trace(mdc: MappedDiagnosticContext, message: () -> String) =
+            traceLogger.withMdc(mdc) { log(message()) }
+
+}
+
+abstract class AtLevelLogger {
+    abstract fun log(message: String)
+    abstract fun log(message: String, t: Throwable)
+    open val enabled: Boolean
+        get() = true
+
+    inline fun withMdc(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) {
+        if (enabled) {
+            try {
+                MDC.setContextMap(mdc())
+                block()
+            } finally {
+                MDC.clear()
+            }
         }
     }
+}
 
-    fun info(message: String, t: Throwable) {
-        logger.info(message, t)
+object OffLevelLogger : AtLevelLogger() {
+    override val enabled = false
+
+    override fun log(message: String) {
+        // do not log anything
     }
 
-    fun info(t: Throwable, messageProvider: () -> String) {
-        if (logger.isInfoEnabled) {
-            logger.info(messageProvider(), t)
-        }
+    override fun log(message: String, t: Throwable) {
+        // do not log anything
     }
+}
 
-    //
-    // WARN
-    //
+class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.error(message)
+    }
+
+    override fun log(message: String, t: Throwable) {
+        logger.error(message, t)
+    }
+}
 
-    fun warn(message: String) {
+class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
         logger.warn(message)
     }
 
-    fun warn(message: String, t: Throwable) {
+    override fun log(message: String, t: Throwable) {
         logger.warn(message, t)
     }
+}
 
-    fun warn(messageProvider: () -> String) {
-        if (logger.isWarnEnabled) {
-            logger.warn(messageProvider())
-        }
+class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.info(message)
     }
 
-    fun warn(t: Throwable, messageProvider: () -> String) {
-        if (logger.isWarnEnabled) {
-            logger.warn(messageProvider(), t)
-        }
+    override fun log(message: String, t: Throwable) {
+        logger.info(message, t)
     }
+}
 
-    //
-    // ERROR
-    //
-
-    fun error(message: String) {
-        logger.error(message)
+class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.debug(message)
     }
 
-    fun error(message: String, t: Throwable) {
-        logger.error(message, t)
+    override fun log(message: String, t: Throwable) {
+        logger.debug(message, t)
     }
+}
 
-    fun error(messageProvider: () -> String) {
-        if (logger.isErrorEnabled) {
-            logger.error(messageProvider())
-        }
+class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.trace(message)
     }
 
-    fun error(t: Throwable, messageProvider: () -> String) {
-        if (logger.isErrorEnabled) {
-            logger.error(messageProvider(), t)
-        }
+    override fun log(message: String, t: Throwable) {
+        logger.trace(message, t)
     }
 }
index e8ec254..1e98f2f 100644 (file)
@@ -25,42 +25,49 @@ import arrow.core.Try
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
-fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
-    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
-    logger.debug("Detailed stack trace", ex)
+fun <T> Logger.handleReactiveStreamError(
+        context: MappedDiagnosticContext,
+        ex: Throwable,
+        returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+    warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
+    withDebug(context) { log("Detailed stack trace", ex) }
     return returnFlux
 }
 
 
 fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+                                   context: MappedDiagnosticContext,
                                    acceptedMsg: (T) -> String,
                                    rejectedMsg: (Throwable) -> String): Flux<T> =
-        fold({
-            logger.warn(rejectedMsg(it))
+        fold({ ex ->
+            logger.withWarn(context) { log(rejectedMsg(ex)) }
             Flux.empty<T>()
-        }, {
-            logger.trace { acceptedMsg(it) }
-            Flux.just(it)
+        }, { obj ->
+            logger.trace(context) { acceptedMsg(obj) }
+            Flux.just(obj)
         })
 
 fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+                                     context: MappedDiagnosticContext,
                                      acceptedMsg: (T) -> String,
                                      rejectedMsg: () -> String): Flux<T> =
         fold({
-            logger.warn(rejectedMsg)
+            logger.warn(context, rejectedMsg)
             Flux.empty<T>()
         }, {
-            logger.trace { acceptedMsg(it) }
+            logger.trace(context) { acceptedMsg(it) }
             Flux.just(it)
         })
 
-fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
+                                    context: MappedDiagnosticContext,
+                                    predicate: (T) -> Either<() -> String, () -> String>) =
         flatMap { t ->
             predicate(t).fold({
-                logger.warn(it)
+                logger.warn(context, it)
                 Mono.empty<T>()
             }, {
-                logger.trace(it)
+                logger.trace(context, it)
                 Mono.just<T>(t)
             })
         }
index c27fb8c..10fc8d8 100644 (file)
@@ -34,11 +34,16 @@ import org.jetbrains.spek.api.dsl.it
 object LoggerTest : Spek({
 
     lateinit var slf4jLogger: org.slf4j.Logger
-    lateinit var cut: Logger
+    fun cut() = Logger(slf4jLogger).also {
+        verify(slf4jLogger).isTraceEnabled
+        verify(slf4jLogger).isDebugEnabled
+        verify(slf4jLogger).isInfoEnabled
+        verify(slf4jLogger).isWarnEnabled
+        verify(slf4jLogger).isErrorEnabled
+    }
 
     beforeEachTest {
         slf4jLogger = mock()
-        cut = Logger(slf4jLogger)
     }
 
     afterEachTest {
@@ -50,28 +55,19 @@ object LoggerTest : Spek({
         val exception = Exception("fail")
 
         describe("debug levels") {
-            it("should log message") {
-                cut.debug(message)
-                verify(slf4jLogger).debug(message)
-            }
-
-            it("should log message with exception") {
-                cut.debug(message, exception)
-                verify(slf4jLogger).debug(message, exception)
-            }
 
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(true)
-                    cut.debug { message }
+                    cut().debug { message }
                     verify(slf4jLogger).isDebugEnabled
                     verify(slf4jLogger).debug(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(false)
-                    cut.debug { message }
+                    cut().debug { message }
                     verify(slf4jLogger).isDebugEnabled
                 }
             }
@@ -80,42 +76,33 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(true)
-                    cut.debug(exception) { message }
+                    cut().withDebug { log(message, exception) }
                     verify(slf4jLogger).isDebugEnabled
                     verify(slf4jLogger).debug(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(false)
-                    cut.debug(exception) { message }
+                    cut().withDebug { log(message, exception) }
                     verify(slf4jLogger).isDebugEnabled
                 }
             }
         }
 
         describe("info levels") {
-            it("should log message") {
-                cut.info(message)
-                verify(slf4jLogger).info(message)
-            }
-
-            it("should log message with exception") {
-                cut.info(message, exception)
-                verify(slf4jLogger).info(message, exception)
-            }
 
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(true)
-                    cut.info { message }
+                    cut().info { message }
                     verify(slf4jLogger).isInfoEnabled
                     verify(slf4jLogger).info(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(false)
-                    cut.info { message }
+                    cut().info { message }
                     verify(slf4jLogger).isInfoEnabled
                 }
             }
@@ -124,42 +111,32 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(true)
-                    cut.info(exception) { message }
+                    cut().withInfo { log(message, exception) }
                     verify(slf4jLogger).isInfoEnabled
                     verify(slf4jLogger).info(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(false)
-                    cut.info(exception) { message }
+                    cut().withInfo { log(message, exception) }
                     verify(slf4jLogger).isInfoEnabled
                 }
             }
         }
 
         describe("warning levels") {
-            it("should log message") {
-                cut.warn(message)
-                verify(slf4jLogger).warn(message)
-            }
-
-            it("should log message with exception") {
-                cut.warn(message, exception)
-                verify(slf4jLogger).warn(message, exception)
-            }
-
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(true)
-                    cut.warn { message }
+                    cut().warn { message }
                     verify(slf4jLogger).isWarnEnabled
                     verify(slf4jLogger).warn(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(false)
-                    cut.warn { message }
+                    cut().warn { message }
                     verify(slf4jLogger).isWarnEnabled
                 }
             }
@@ -168,42 +145,33 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(true)
-                    cut.warn(exception) { message }
+                    cut().withWarn { log(message, exception) }
                     verify(slf4jLogger).isWarnEnabled
                     verify(slf4jLogger).warn(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(false)
-                    cut.warn(exception) { message }
+                    cut().withWarn { log(message, exception) }
                     verify(slf4jLogger).isWarnEnabled
                 }
             }
         }
 
         describe("error levels") {
-            it("should log message") {
-                cut.error(message)
-                verify(slf4jLogger).error(message)
-            }
-
-            it("should log message with exception") {
-                cut.error(message, exception)
-                verify(slf4jLogger).error(message, exception)
-            }
 
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(true)
-                    cut.error { message }
+                    cut().error { message }
                     verify(slf4jLogger).isErrorEnabled
                     verify(slf4jLogger).error(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(false)
-                    cut.error { message }
+                    cut().error { message }
                     verify(slf4jLogger).isErrorEnabled
                 }
             }
@@ -212,14 +180,14 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(true)
-                    cut.error(exception) { message }
+                    cut().withError { log(message, exception) }
                     verify(slf4jLogger).isErrorEnabled
                     verify(slf4jLogger).error(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(false)
-                    cut.error(exception) { message }
+                    cut().withError { log(message, exception) }
                     verify(slf4jLogger).isErrorEnabled
                 }
             }
index 0f359df..da956be 100644 (file)
@@ -42,7 +42,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Try.just(event)
 
                 it("should not filter stream event and log accepted message") {
-                    cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+                    cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
                             .test()
                             .expectNext(event)
                             .verifyComplete()
@@ -53,7 +53,7 @@ class ReactiveLoggingTest : Spek({
                 val e = Exception()
                 val cut = Failure(e)
                 it("should filter stream event and log rejected message") {
-                    cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+                    cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
                             .test()
                             .verifyComplete()
                 }
@@ -65,7 +65,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Option.just(event)
 
                 it("should not filter stream event and log accepted message") {
-                    cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+                    cut.filterEmptyWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE)
                             .test()
                             .expectNext(event)
                             .verifyComplete()
@@ -75,7 +75,7 @@ class ReactiveLoggingTest : Spek({
             given("empty Option") {
                 val cut = Option.empty<Int>()
                 it("should filter stream event and log rejected message") {
-                    cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+                    cut.filterEmptyWithLog(logger,::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE)
                             .test()
                             .verifyComplete()
                 }
@@ -88,7 +88,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Flux.just(event)
 
                 it("should not filter stream event and log accepted message") {
-                    cut.filterFailedWithLog(logger, right())
+                    cut.filterFailedWithLog(logger,::emptyMap, right())
                             .test()
                             .expectNext(event)
                             .verifyComplete()
@@ -99,7 +99,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Flux.just(event)
 
                 it("should filter stream event and log rejected message") {
-                    cut.filterFailedWithLog(logger, left())
+                    cut.filterFailedWithLog(logger,::emptyMap, left())
                             .test()
                             .verifyComplete()
                 }
index 57aaf3d..ca6d169 100644 (file)
@@ -61,12 +61,14 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
                 .handle { _, output -> handler(complete, messages, output) }
                 .connect()
                 .doOnError {
-                    logger.info("Failed to connect to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}")
+                    logger.info {
+                        "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
+                    }
                 }
                 .subscribe {
-                    logger.info("Connected to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}")
+                    logger.info {
+                        "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
+                    }
                 }
         return complete.then()
     }
@@ -86,7 +88,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
                 .options { it.flushOnBoundary() }
                 .sendGroups(frames)
                 .then {
-                    logger.info("Messages have been sent")
+                    logger.info { "Messages have been sent" }
                     complete.onComplete()
                 }
                 .then()
index 1601938..cfd3a6e 100644 (file)
@@ -59,17 +59,17 @@ internal class XnfApiServer(
                 .post("simulator/async", ::startSimulationHandler)
                 .get("simulator/:id", ::simulatorStatusHandler)
                 .get("healthcheck") { ctx ->
-                    logger.info("Checking health")
+                    logger.info { "Checking health" }
                     ctx.response.status(HttpConstants.STATUS_OK).send()
                 }
     }
 
     private fun startSimulationHandler(ctx: Context) {
-        logger.info("Attempting to start asynchronous scenario")
+        logger.info { "Attempting to start asynchronous scenario" }
         ctx.request.body.then { body ->
             val id = startSimulation(body)
             when (id) {
-                is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}"}
+                is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" }
                 is Either.Right -> logger.info { "Scenario started, details: ${id.b}" }
             }
             ctx.response.sendEitherErrorOrResponse(id)
@@ -83,7 +83,7 @@ internal class XnfApiServer(
     }
 
     private fun simulatorStatusHandler(ctx: Context) {
-        logger.debug("Checking task status")
+        logger.debug { "Checking task status" }
         val id = UUID.fromString(ctx.pathTokens["id"])
         logger.debug { "Checking status for id: $id" }
         val status = ongoingSimulations.status(id)
index 21748ae..d7d42d8 100644 (file)
@@ -43,11 +43,11 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
         simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
             result.fold(
                     { err ->
-                        logger.warn("Error", err)
+                        logger.withWarn { log("Error", err) }
                         simulations[id] = StatusFailure(err)
                     },
                     {
-                        logger.info("Finished sending messages")
+                        logger.info { "Finished sending messages" }
                         simulations[id] = StatusSuccess
                     }
             )
index 4512dfb..91070d3 100644 (file)
@@ -42,7 +42,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
 fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
         .map { config ->
-            logger.info("Using configuration: $config")
+            logger.info { "Using configuration: $config" }
             val xnfSimulator = XnfSimulator(
                     VesHvClient(config),
                     MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
@@ -52,10 +52,10 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         }
         .unsafeRunEitherSync(
                 { ex ->
-                    logger.error("Failed to start a server", ex)
+                    logger.withError { log("Failed to start a server", ex) }
                     ExitFailure(1)
                 },
                 {
-                    logger.info("Started xNF Simulator API server")
+                    logger.info { "Started xNF Simulator API server" }
                 }
         )