Fix request interval 05/90005/2
authorFilip Krzywka <filip.krzywka@nokia.com>
Mon, 17 Jun 2019 11:52:06 +0000 (13:52 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Tue, 18 Jun 2019 05:57:24 +0000 (07:57 +0200)
In previous implementation DistinctUntilChangedSubscriber always
requested from upstream 256 events, which resulted in immediate
256 requests to CBS.

Request amount is not configurable in other way than hard-limiting
using `limitRequest`, which limits request amount for single subscriber.
(At least in our pipeline)

To avoid multiple manual subscribes, this commit changed CbsClientAdapter
to use Mono instead of Flux for CbsRequests and repeat this Mono
conditionally. Flux inside of repeatWhen is emitting event after each
onComplete received from upstream Mono and resubscribes to it if condition
is met. This seemed like good place to put our interval mechanism, which
is always-pass condition, but condition resolving blocks for variable
duration.

Change-Id: I04d1e657ec4d82185f6f07422c25c2d2ff23e60d
Issue-ID: DCAEGEN2-1557
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt

index 8b7ed67..905c737 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.config.impl
 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
-import org.onap.dcae.collectors.veshv.utils.rx.delayElements
+import org.onap.dcae.collectors.veshv.utils.rx.nextWithVariableInterval
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest
@@ -63,15 +63,12 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
     }
 
     private fun toPeriodicalConfigurations(cbsClient: CbsClient) =
-            Mono.just(configurationRequest())
-                    .repeat()
-                    .map(CbsRequest::withNewInvocationId)
-                    .flatMap(cbsClient::get)
-                    .transform(delayElements(requestInterval::get))
-
-    private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
+            Mono.defer { cbsClient.get(configurationRequest.withNewInvocationId()) }
+                    .repeatWhen { it.nextWithVariableInterval(requestInterval::get) }
 
     companion object {
         private val logger = Logger(CbsClientAdapter::class)
+
+        private val configurationRequest: CbsRequest = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
     }
 }
index e188605..d68ca5c 100644 (file)
@@ -34,6 +34,6 @@ import java.time.Duration
 fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
         toMono().then(Mono.fromCallable(callback))
 
-fun <T> delayElements(intervalSupplier: () -> Duration): (Flux<T>) -> Flux<T> = { flux ->
-    flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) }
-}
+fun <T> Flux<T>.nextWithVariableInterval(intervalSupplier: () -> Duration): Flux<T> =
+        concatMap { Mono.just(it).delayElement(intervalSupplier()) }
+