Handle stream error early 40/72040/6
authorFilip Krzywka <filip.krzywka@nokia.com>
Wed, 7 Nov 2018 07:16:09 +0000 (08:16 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Tue, 13 Nov 2018 12:14:16 +0000 (13:14 +0100)
Should fix inconsistent logging due to Reactor Signal sometimes
propagating from WireChunkDecoder stream to VesHvCollector stream as
Signal.CANCEL instead of Signal.ERROR and thus not being handled
correctly.
As a drawback however we will log error twice in case it comes from
WireChunkDecoder as we want to terminate connection in such case and
so we need to propagate error.

In WireChunkDecoder `doOnTerminate` was changed to
`doFinally` as this method handles also cancellation
signals and not only terminal signals.

Also fixed minor checkstyle reported issues.

Change-Id: I6e91d96c5a1a3ecf30603db9a71e032c770d507f
Issue-ID: DCAEGEN2-955
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt [new file with mode: 0644]

index 8970e03..b700f13 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
@@ -52,8 +53,8 @@ internal class VesHvCollector(
                         .transform(::decodePayload)
                         .filter(VesMessage::isValid)
                         .transform(::routeMessage)
+                        .onErrorResume { logger.handleReactiveStreamError(it) }
                         .doFinally { releaseBuffersMemory(wireDecoder) }
-                        .onErrorResume(::handleErrors)
                         .then()
             }
 
@@ -81,12 +82,6 @@ internal class VesHvCollector(
 
     private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
 
-    private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
-        logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
-        logger.debug("Detailed stack trace", ex)
-        return Flux.empty()
-    }
-
     companion object {
         private val logger = Logger(VesHvCollector::class)
     }
index ede5a66..7a47cfc 100644 (file)
@@ -71,7 +71,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                     },
                     {
                         logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
-                        it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+                        val allocator = nettyInbound.context().channel().alloc()
+                        it.handleConnection(allocator, createDataStream(nettyInbound))
                     }
             )
 
index 0775c65..4a2ef6b 100644 (file)
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.SynchronousSink
 
@@ -51,7 +52,9 @@ internal class WireChunkDecoder(
             Flux.empty()
         } else {
             streamBuffer.addComponent(true, byteBuf)
-            generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+            generateFrames()
+                    .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+                    .doFinally { streamBuffer.discardReadComponents() }
         }
     }
 
index ba4c080..a841447 100644 (file)
@@ -63,4 +63,4 @@ fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
 
 private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
         getOptionValue(cmdLineOpt.option.opt),
-        { System.getenv(cmdLineOpt.environmentVariableName()) })
\ No newline at end of file
+        { System.getenv(cmdLineOpt.environmentVariableName()) })
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
new file mode 100644 (file)
index 0000000..714702d
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import reactor.core.publisher.Flux
+
+fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+    logger.debug("Detailed stack trace", ex)
+    return returnFlux
+}