Handle non-existing Collector instance 59/64659/3
authorFilip Krzywka <filip.krzywka@nokia.com>
Wed, 5 Sep 2018 08:37:51 +0000 (10:37 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Thu, 6 Sep 2018 07:59:25 +0000 (07:59 +0000)
Change-Id: I0b6cd5023b2bca0f0bee6958c107fc560fc95b52
Issue-ID: DCAEGEN2-751
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt

index 6c256b7..3c85a9b 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
+import arrow.core.Option
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
@@ -30,7 +31,7 @@ interface Collector {
     fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
 }
 
-typealias CollectorProvider = () -> Collector
+typealias CollectorProvider = () -> Option<Collector>
 
 interface Server {
     fun start(): IO<ServerHandle>
index a400ff3..d807a9e 100644 (file)
@@ -32,6 +32,7 @@ import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.util.concurrent.atomic.AtomicReference
 
@@ -57,7 +58,7 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                     healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
                 }
                 .subscribe(collector::set)
-        return collector::get
+        return collector::getOption
     }
 
     private fun createVesHvCollector(config: CollectorConfiguration): Collector {
index f858d95..a34be7c 100644 (file)
@@ -70,23 +70,34 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                 .receive()
                 .retain()
 
-        return collectorProvider()
-                .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+        return collectorProvider().fold(
+                {
+                    logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+                    Mono.empty()
+                },
+                { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
+
     }
 
     private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
         onReadIdle(timeout.toMillis()) {
-            logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
-            context().channel().close().addListener {
-                if (it.isSuccess)
-                    logger.debug { "Client disconnected because of idle timeout" }
-                else
-                    logger.warn("Channel close failed", it.cause())
+            logger.info {
+                "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
             }
+            disconnectClient()
         }
         return this
     }
 
+    private fun NettyInbound.disconnectClient() {
+        context().channel().close().addListener {
+            if (it.isSuccess)
+                logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+            else
+                logger.warn("Channel close failed", it.cause())
+        }
+    }
+
     private fun NettyInbound.logConnectionClosed(): NettyInbound {
         context().onClose {
             logger.info("Connection from ${remoteAddress()} has been closed")
index e9b7057..942e6ed 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.tests.component
 
+import arrow.core.getOrElse
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.UnpooledByteBufAllocator
@@ -48,7 +49,7 @@ class Sut(sink: Sink = StoringSink()) {
     private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
-        get() = collectorProvider()
+        get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
 }
 
 fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {