X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=sources%2Fhv-collector-utils%2Fsrc%2Fmain%2Fkotlin%2Forg%2Fonap%2Fdcae%2Fcollectors%2Fveshv%2Futils%2Flogging%2Freactive_logging.kt;fp=sources%2Fhv-collector-utils%2Fsrc%2Fmain%2Fkotlin%2Forg%2Fonap%2Fdcae%2Fcollectors%2Fveshv%2Futils%2Flogging%2Freactive_logging.kt;h=e8ec2549ba149de5c3455b97378a93b7c89cbadd;hb=d632aef8303701a1802f817c3d6fdcd4064c32b2;hp=714702d370ee2f6f6cb90119f07c77f69d27e0c4;hpb=dde383a2aa75f94c26d7949665b79cc95486a223;p=dcaegen2%2Fcollectors%2Fhv-ves.git diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index 714702d3..e8ec2549 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -19,10 +19,48 @@ */ package org.onap.dcae.collectors.veshv.utils.logging +import arrow.core.Either +import arrow.core.Option +import arrow.core.Try import reactor.core.publisher.Flux +import reactor.core.publisher.Mono fun Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux = Flux.empty()): Flux { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") logger.debug("Detailed stack trace", ex) return returnFlux } + + +fun Try.filterFailedWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: (Throwable) -> String): Flux = + fold({ + logger.warn(rejectedMsg(it)) + Flux.empty() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun Option.filterEmptyWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: () -> String): Flux = + fold({ + logger.warn(rejectedMsg) + Flux.empty() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun Flux.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = + flatMap { t -> + predicate(t).fold({ + logger.warn(it) + Mono.empty() + }, { + logger.trace(it) + Mono.just(t) + }) + }