Harmonize logging and add new logs
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-utils / src / main / kotlin / org / onap / dcae / collectors / veshv / utils / logging / reactive_logging.kt
index 714702d..e8ec254 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.utils.logging
 
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 
 fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
-    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
     logger.debug("Detailed stack trace", ex)
     return returnFlux
 }
+
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+                                   acceptedMsg: (T) -> String,
+                                   rejectedMsg: (Throwable) -> String): Flux<T> =
+        fold({
+            logger.warn(rejectedMsg(it))
+            Flux.empty<T>()
+        }, {
+            logger.trace { acceptedMsg(it) }
+            Flux.just(it)
+        })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+                                     acceptedMsg: (T) -> String,
+                                     rejectedMsg: () -> String): Flux<T> =
+        fold({
+            logger.warn(rejectedMsg)
+            Flux.empty<T>()
+        }, {
+            logger.trace { acceptedMsg(it) }
+            Flux.just(it)
+        })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+        flatMap { t ->
+            predicate(t).fold({
+                logger.warn(it)
+                Mono.empty<T>()
+            }, {
+                logger.trace(it)
+                Mono.just<T>(t)
+            })
+        }