import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+typealias MessageEither = Either<() -> String, () -> String>
+
fun <T> Logger.handleReactiveStreamError(
context: MappedDiagnosticContext,
ex: Throwable,
fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
context: MappedDiagnosticContext,
- predicate: (T) -> Either<() -> String, () -> String>) =
+ predicate: (T) -> MessageEither): Flux<T> =
flatMap { t ->
predicate(t).fold({
logger.warn(context, it)
Mono.just<T>(t)
})
}
+
+
+fun <T> Mono<T>.onErrorLog(logger: Logger,
+ mdc: () -> Map<String, String>,
+ msg: () -> String) =
+ doOnError { logException(logger, mdc, msg, it) }
+
+fun <T> Flux<T>.onErrorLog(logger: Logger,
+ mdc: () -> Map<String, String>,
+ msg: () -> String) =
+ doOnError { logException(logger, mdc, msg, it) }
+
+private fun logException(logger: Logger, mdc: () -> Map<String, String>, msg: () -> String, it: Throwable) {
+ logger.error(mdc) { "${msg()}: ${it.message}" }
+ logger.debug(mdc) { "Detailed stack trace: ${it}" }
+}