There should be one KafkaSender per configuration
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / boundary / adapters.kt
index ac55e55..e4a7394 100644 (file)
@@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import reactor.core.publisher.Flux
 
 interface Sink {
-    fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+    fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
 }
 
 interface Metrics {
@@ -41,14 +42,14 @@ interface Metrics {
     fun notifyClientRejected(cause: ClientRejectionCause)
 }
 
-@FunctionalInterface
 interface SinkProvider {
-    operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
+    operator fun invoke(ctx: ClientContext): Sink
 
     companion object {
         fun just(sink: Sink): SinkProvider =
                 object : SinkProvider {
-                    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
+                    override fun invoke(
+                            ctx: ClientContext): Sink = sink
                 }
     }
 }