X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=sources%2Fhv-collector-utils%2Fsrc%2Fmain%2Fkotlin%2Forg%2Fonap%2Fdcae%2Fcollectors%2Fveshv%2Futils%2Flogging%2Freactive_logging.kt;fp=sources%2Fhv-collector-utils%2Fsrc%2Fmain%2Fkotlin%2Forg%2Fonap%2Fdcae%2Fcollectors%2Fveshv%2Futils%2Flogging%2Freactive_logging.kt;h=1e98f2fc07066d3f79387de0d4397e73addea26c;hb=8b8c37c296e55644063e0332fd455437168e78da;hp=e8ec2549ba149de5c3455b97378a93b7c89cbadd;hpb=73293332b2244b66083dc5d3910801c1b1058105;p=dcaegen2%2Fcollectors%2Fhv-ves.git diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index e8ec2549..1e98f2fc 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -25,42 +25,49 @@ import arrow.core.Try import reactor.core.publisher.Flux import reactor.core.publisher.Mono -fun Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux = Flux.empty()): Flux { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") - logger.debug("Detailed stack trace", ex) +fun Logger.handleReactiveStreamError( + context: MappedDiagnosticContext, + ex: Throwable, + returnFlux: Flux = Flux.empty()): Flux { + warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" } + withDebug(context) { log("Detailed stack trace", ex) } return returnFlux } fun Try.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: (Throwable) -> String): Flux = - fold({ - logger.warn(rejectedMsg(it)) + fold({ ex -> + logger.withWarn(context) { log(rejectedMsg(ex)) } Flux.empty() - }, { - logger.trace { acceptedMsg(it) } - Flux.just(it) + }, { obj -> + logger.trace(context) { acceptedMsg(obj) } + Flux.just(obj) }) fun Option.filterEmptyWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: () -> String): Flux = fold({ - logger.warn(rejectedMsg) + logger.warn(context, rejectedMsg) Flux.empty() }, { - logger.trace { acceptedMsg(it) } + logger.trace(context) { acceptedMsg(it) } Flux.just(it) }) -fun Flux.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = +fun Flux.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, + predicate: (T) -> Either<() -> String, () -> String>) = flatMap { t -> predicate(t).fold({ - logger.warn(it) + logger.warn(context, it) Mono.empty() }, { - logger.trace(it) + logger.trace(context, it) Mono.just(t) }) }