*/
package org.onap.dcae.collectors.veshv.utils.logging
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
logger.debug("Detailed stack trace", ex)
return returnFlux
}
+
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: (Throwable) -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg(it))
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: () -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg)
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+ flatMap { t ->
+ predicate(t).fold({
+ logger.warn(it)
+ Mono.empty<T>()
+ }, {
+ logger.trace(it)
+ Mono.just<T>(t)
+ })
+ }