*/
package org.onap.dcae.collectors.veshv.utils.logging
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
-fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
- logger.debug("Detailed stack trace", ex)
+typealias MessageEither = Either<() -> String, () -> String>
+
+fun <T> Logger.handleReactiveStreamError(
+ context: MappedDiagnosticContext,
+ ex: Throwable,
+ returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+ warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
+ withDebug(context) { log("Detailed stack trace", ex) }
return returnFlux
}
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: (Throwable) -> String): Flux<T> =
+ fold({ ex ->
+ logger.withWarn(context) { log(rejectedMsg(ex)) }
+ Flux.empty<T>()
+ }, { obj ->
+ logger.trace(context) { acceptedMsg(obj) }
+ Flux.just(obj)
+ })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: () -> String): Flux<T> =
+ fold({
+ logger.warn(context, rejectedMsg)
+ Flux.empty<T>()
+ }, {
+ logger.trace(context) { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
+ context: MappedDiagnosticContext,
+ predicate: (T) -> MessageEither): Flux<T> =
+ flatMap { t ->
+ predicate(t).fold({
+ logger.warn(context, it)
+ Mono.empty<T>()
+ }, {
+ logger.trace(context, it)
+ Mono.just<T>(t)
+ })
+ }
\ No newline at end of file