import org.apache.kafka.clients.producer.ProducerConfig
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
* @since June 2018
*/
internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+ override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
}
private fun constructSenderOptions(config: CollectorConfiguration) =