* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
messages.map(::vesToKafkaRecord)
- .compose { sender.send(it) }
+ .`as` { sender.send(it) }
.map {
val msg = it.correlationMetadata()
if (it.exception() == null) {