Add log diagnostic context
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-utils / src / main / kotlin / org / onap / dcae / collectors / veshv / utils / logging / reactive_logging.kt
index e8ec254..1e98f2f 100644 (file)
@@ -25,42 +25,49 @@ import arrow.core.Try
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
-fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
-    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
-    logger.debug("Detailed stack trace", ex)
+fun <T> Logger.handleReactiveStreamError(
+        context: MappedDiagnosticContext,
+        ex: Throwable,
+        returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+    warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
+    withDebug(context) { log("Detailed stack trace", ex) }
     return returnFlux
 }
 
 
 fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+                                   context: MappedDiagnosticContext,
                                    acceptedMsg: (T) -> String,
                                    rejectedMsg: (Throwable) -> String): Flux<T> =
-        fold({
-            logger.warn(rejectedMsg(it))
+        fold({ ex ->
+            logger.withWarn(context) { log(rejectedMsg(ex)) }
             Flux.empty<T>()
-        }, {
-            logger.trace { acceptedMsg(it) }
-            Flux.just(it)
+        }, { obj ->
+            logger.trace(context) { acceptedMsg(obj) }
+            Flux.just(obj)
         })
 
 fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+                                     context: MappedDiagnosticContext,
                                      acceptedMsg: (T) -> String,
                                      rejectedMsg: () -> String): Flux<T> =
         fold({
-            logger.warn(rejectedMsg)
+            logger.warn(context, rejectedMsg)
             Flux.empty<T>()
         }, {
-            logger.trace { acceptedMsg(it) }
+            logger.trace(context) { acceptedMsg(it) }
             Flux.just(it)
         })
 
-fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
+                                    context: MappedDiagnosticContext,
+                                    predicate: (T) -> Either<() -> String, () -> String>) =
         flatMap { t ->
             predicate(t).fold({
-                logger.warn(it)
+                logger.warn(context, it)
                 Mono.empty<T>()
             }, {
-                logger.trace(it)
+                logger.trace(context, it)
                 Mono.just<T>(t)
             })
         }