import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(it) }
.doFinally { releaseBuffersMemory(wireDecoder) }
- .onErrorResume(::handleErrors)
.then()
}
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
- private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
- logger.debug("Detailed stack trace", ex)
- return Flux.empty()
- }
-
companion object {
private val logger = Logger(VesHvCollector::class)
}
},
{
logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ val allocator = nettyInbound.context().channel().alloc()
+ it.handleConnection(allocator, createDataStream(nettyInbound))
}
)
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
Flux.empty()
} else {
streamBuffer.addComponent(true, byteBuf)
- generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+ generateFrames()
+ .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .doFinally { streamBuffer.discardReadComponents() }
}
}
private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
getOptionValue(cmdLineOpt.option.opt),
- { System.getenv(cmdLineOpt.environmentVariableName()) })
\ No newline at end of file
+ { System.getenv(cmdLineOpt.environmentVariableName()) })
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import reactor.core.publisher.Flux
+
+fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.debug("Detailed stack trace", ex)
+ return returnFlux
+}