Support CBS request interval reconfiguration
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-utils / src / main / kotlin / org / onap / dcae / collectors / veshv / utils / rx / rx.kt
index ceccbcb..e188605 100644 (file)
 package org.onap.dcae.collectors.veshv.utils.rx
 
 import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.core.publisher.toMono
+import java.time.Duration
 
 fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
         toMono().then(Mono.fromCallable(callback))
+
+fun <T> delayElements(intervalSupplier: () -> Duration): (Flux<T>) -> Flux<T> = { flux ->
+    flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) }
+}