Enable setting log level from command line
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-utils / src / main / kotlin / org / onap / dcae / collectors / veshv / utils / logging / reactive_logging.kt
index 714702d..e7aca55 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.utils.logging
 
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 
-fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
-    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
-    logger.debug("Detailed stack trace", ex)
+typealias MessageEither = Either<() -> String, () -> String>
+
+fun <T> Logger.handleReactiveStreamError(
+        context: MappedDiagnosticContext,
+        ex: Throwable,
+        returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+    warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
+    withDebug(context) { log("Detailed stack trace", ex) }
     return returnFlux
 }
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+                                   context: MappedDiagnosticContext,
+                                   acceptedMsg: (T) -> String,
+                                   rejectedMsg: (Throwable) -> String): Flux<T> =
+        fold({ ex ->
+            logger.withWarn(context) { log(rejectedMsg(ex)) }
+            Flux.empty<T>()
+        }, { obj ->
+            logger.trace(context) { acceptedMsg(obj) }
+            Flux.just(obj)
+        })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+                                     context: MappedDiagnosticContext,
+                                     acceptedMsg: (T) -> String,
+                                     rejectedMsg: () -> String): Flux<T> =
+        fold({
+            logger.warn(context, rejectedMsg)
+            Flux.empty<T>()
+        }, {
+            logger.trace(context) { acceptedMsg(it) }
+            Flux.just(it)
+        })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
+                                    context: MappedDiagnosticContext,
+                                    predicate: (T) -> MessageEither): Flux<T> =
+        flatMap { t ->
+            predicate(t).fold({
+                logger.warn(context, it)
+                Mono.empty<T>()
+            }, {
+                logger.trace(context, it)
+                Mono.just<T>(t)
+            })
+        }
\ No newline at end of file