X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=sources%2Fhv-collector-core%2Fsrc%2Fmain%2Fkotlin%2Forg%2Fonap%2Fdcae%2Fcollectors%2Fveshv%2Fimpl%2Fadapters%2FLoggingSinkProvider.kt;h=7535fbee9c7d403097ce7f3122beb2c746f6c763;hb=4128aa2c9368ed20fab92e8c0df83f14d6233b86;hp=14966d9b1e7105a9c56956334897476e63829a95;hpb=4ab95420e42f6df59bd4851eee41be6579bdbbe1;p=dcaegen2%2Fcollectors%2Fhv-ves.git diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 14966d9b..7535fbee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicLong @@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong */ internal class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { + override fun invoke(ctx: ClientContext): Sink { return object : Sink { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() - override fun send(messages: Flux): Flux = - messages - .doOnNext(this::logMessage) + override fun send(messages: Flux): Flux = + messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage) private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1)