Enhance releasing memory 49/68049/5
authorFilip Krzywka <filip.krzywka@nokia.com>
Thu, 20 Sep 2018 10:17:46 +0000 (12:17 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Thu, 20 Sep 2018 12:47:23 +0000 (14:47 +0200)
- Some buffers may be emitted as cancelled and thus they would not be
handled by doOnTerminate method
- Moved data stream creation for Netty inbound to time when
collector is fully functional

Change-Id: If2f2195fadeca957679f6be696802f48a616f48d
Issue-ID: DCAEGEN2-815
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
docker-compose.yml
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt

index f9f52b4..f37c823 100644 (file)
@@ -1,4 +1,4 @@
-version: "3.4"
+version: "3.5"
 services:
   zookeeper:
     image: wurstmeister/zookeeper
@@ -35,6 +35,8 @@ services:
     ports:
       - "6060:6060"
       - "6061:6061/tcp"
+    entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
+                 "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
     command: ["--listen-port", "6061",
               "--health-check-api-port", "6060",
               "--config-url", "http://consul:8500/v1/kv/veshv-config"]
index 5268916..f608a2b 100644 (file)
@@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.core.publisher.SynchronousSink
@@ -58,7 +57,7 @@ internal class VesHvCollector(
                         .transform(::decodePayload)
                         .filter(VesMessage::isValid)
                         .transform(::routeMessage)
-                        .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+                        .doFinally { releaseBuffersMemory(wireDecoder) }
                         .onErrorResume(::handleErrors)
                         .then()
             }
index a34be7c..b4ad4b7 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Mono
+import reactor.ipc.netty.ByteBufFlux
 import reactor.ipc.netty.NettyInbound
 import reactor.ipc.netty.NettyOutbound
 import reactor.ipc.netty.options.ServerOptions
@@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
         opts.port(serverConfig.listenPort)
     }
 
-    private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
-        logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+    private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+            collectorProvider().fold(
+                    {
+                        logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+                        Mono.empty()
+                    },
+                    {
+                        logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
+                        it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+                    }
+            )
 
-        val dataStream = nettyInbound
-                .configureIdleTimeout(serverConfig.idleTimeout)
-                .logConnectionClosed()
-                .receive()
-                .retain()
 
-        return collectorProvider().fold(
-                {
-                    logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
-                    Mono.empty()
-                },
-                { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
-
-    }
+    fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
+            .configureIdleTimeout(serverConfig.idleTimeout)
+            .logConnectionClosed()
+            .receive()
+            .retain()
 
     private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
         onReadIdle(timeout.toMillis()) {