import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
*/
internal class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ override fun invoke(ctx: ClientContext): Sink {
return object : Sink {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
- messages
- .doOnNext(this::logMessage)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)