Merge configurations
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-utils / src / main / kotlin / org / onap / dcae / collectors / veshv / utils / arrow / core.kt
index d5b33b9..47b3d55 100644 (file)
@@ -34,6 +34,7 @@ import arrow.syntax.collections.firstOption
 import arrow.typeclasses.MonadContinuation
 import arrow.typeclasses.binding
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import java.util.concurrent.atomic.AtomicReference
 
 /**
@@ -57,8 +58,12 @@ fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
 
 fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
 
+fun <A : Exception, B> Flux<Either<A, B>>.throwOnLeft(): Flux<B> = map { it.rightOrThrow() }
+
 fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) }
 
+fun <A, B> Mono<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Mono<B> = map { it.rightOrThrow(f) }
+
 fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
 
 fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =