[DCAE] INFO.yaml update
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-utils / src / main / kotlin / org / onap / dcae / collectors / veshv / utils / logging / reactive_logging.kt
index 95590d9..7d92dda 100644 (file)
@@ -25,6 +25,8 @@ import arrow.core.Try
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
+typealias MessageEither = Either<() -> String, () -> String>
+
 fun <T> Logger.handleReactiveStreamError(
         context: MappedDiagnosticContext,
         ex: Throwable,
@@ -60,7 +62,7 @@ fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
 
 fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
                                     context: MappedDiagnosticContext,
-                                    predicate: (T) -> Either<() -> String, () -> String>) =
+                                    predicate: (T) -> MessageEither): Flux<T> =
         flatMap { t ->
             predicate(t).fold({
                 logger.warn(context, it)
@@ -70,3 +72,19 @@ fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
                 Mono.just<T>(t)
             })
         }
+
+
+fun <T> Mono<T>.onErrorLog(logger: Logger,
+                           mdc: () -> Map<String, String>,
+                           msg: () -> String) =
+        doOnError { logException(logger, mdc, msg, it) }
+
+fun <T> Flux<T>.onErrorLog(logger: Logger,
+                           mdc: () -> Map<String, String>,
+                           msg: () -> String) =
+        doOnError { logException(logger, mdc, msg, it) }
+
+private fun logException(logger: Logger, mdc: () -> Map<String, String>, msg: () -> String, it: Throwable) {
+    logger.error(mdc) { "${msg()}: ${it.message}" }
+    logger.debug(mdc) { "Detailed stack trace: ${it}" }
+}