There should be one KafkaSender per configuration
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / LoggingSinkProvider.kt
index 14966d9..7535fbe 100644 (file)
@@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import java.util.concurrent.atomic.AtomicLong
@@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong
  */
 internal class LoggingSinkProvider : SinkProvider {
 
-    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+    override fun invoke(ctx: ClientContext): Sink {
         return object : Sink {
             private val totalMessages = AtomicLong()
             private val totalBytes = AtomicLong()
 
-            override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
-                    messages
-                            .doOnNext(this::logMessage)
+            override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+                    messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
 
             private fun logMessage(msg: RoutedMessage) {
                 val msgs = totalMessages.addAndGet(1)