Upgrade hv-ves, reactor, protobuf and sdk versions
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaPublisher.kt
index 91e6fde..663fbb1 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -46,7 +46,7 @@ internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader,
 
     override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
             messages.map(::vesToKafkaRecord)
-                    .compose { sender.send(it) }
+                    .`as` { sender.send(it) }
                     .map {
                         val msg = it.correlationMetadata()
                         if (it.exception() == null) {