import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
- logger.debug("Detailed stack trace", ex)
+typealias MessageEither = Either<() -> String, () -> String>
+
+fun <T> Logger.handleReactiveStreamError(
+ context: MappedDiagnosticContext,
+ ex: Throwable,
+ returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+ warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
+ withDebug(context) { log("Detailed stack trace", ex) }
return returnFlux
}
-
fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
acceptedMsg: (T) -> String,
rejectedMsg: (Throwable) -> String): Flux<T> =
- fold({
- logger.warn(rejectedMsg(it))
+ fold({ ex ->
+ logger.withWarn(context) { log(rejectedMsg(ex)) }
Flux.empty<T>()
- }, {
- logger.trace { acceptedMsg(it) }
- Flux.just(it)
+ }, { obj ->
+ logger.trace(context) { acceptedMsg(obj) }
+ Flux.just(obj)
})
fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
acceptedMsg: (T) -> String,
rejectedMsg: () -> String): Flux<T> =
fold({
- logger.warn(rejectedMsg)
+ logger.warn(context, rejectedMsg)
Flux.empty<T>()
}, {
- logger.trace { acceptedMsg(it) }
+ logger.trace(context) { acceptedMsg(it) }
Flux.just(it)
})
-fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
+ predicate: (T) -> MessageEither): Flux<T> =
flatMap { t ->
predicate(t).fold({
- logger.warn(it)
+ logger.warn(context, it)
Mono.empty<T>()
}, {
- logger.trace(it)
+ logger.trace(context, it)
Mono.just<T>(t)
})
}