Fix shutting down when new config received bug 77/83977/5
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 2 Apr 2019 13:40:46 +0000 (15:40 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 3 Apr 2019 06:51:03 +0000 (08:51 +0200)
When new configuration has been received and at least one client
connection has been active the collector used to shut down.

Also got rid of some more IO monad usage.

Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d
Issue-ID: DCAEGEN2-1382
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
21 files changed:
development/configuration/local.json [new file with mode: 0644]
sources/hv-collector-commandline/pom.xml
sources/hv-collector-core/pom.xml
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt [moved from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt with 98% similarity]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
sources/hv-collector-main/pom.xml
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
sources/hv-collector-main/src/main/resources/logback.xml
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt

diff --git a/development/configuration/local.json b/development/configuration/local.json
new file mode 100644 (file)
index 0000000..a1a8b53
--- /dev/null
@@ -0,0 +1,20 @@
+{
+  "logLevel": "DEBUG",
+  "server": {
+    "listenPort": 8061,
+    "idleTimeoutSec": 60,
+    "maxPayloadSizeBytes": 1048576
+  },
+  "cbs": {
+    "firstRequestDelaySec": 10,
+    "requestIntervalSec": 5
+  },
+  "security": {
+    "keys": {
+      "keyStoreFile": "development/ssl/server.p12",
+      "keyStorePassword": "onaponap",
+      "trustStoreFile": "development/ssl/trust.p12",
+      "trustStorePassword": "onaponap"
+    }
+  }
+}
\ No newline at end of file
index 7f8643d..078a3cb 100644 (file)
       <groupId>org.jetbrains.kotlin</groupId>
       <artifactId>kotlin-reflect</artifactId>
     </dependency>
-    <dependency>
-      <groupId>io.arrow-kt</groupId>
-      <artifactId>arrow-effects</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
index e7134e1..e15592f 100644 (file)
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-reflect</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects</artifactId>
-        </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-core</artifactId>
index ba0a9ee..0039ef6 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
-import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.utils.Closeable
@@ -36,5 +35,5 @@ interface CollectorProvider : Closeable {
 }
 
 interface Server {
-    fun start(): IO<ServerHandle>
+    fun start(): Mono<ServerHandle>
 }
index 1c79abd..8fb4e80 100644 (file)
@@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration,
     }
 
     private fun createVesHvCollector(ctx: ClientContext): Collector =
-            VesHvCollector(
+            HvVesCollector(
                     clientContext = ctx,
                     wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
                     protobufDecoder = VesDecoder(),
@@ -44,7 +44,7 @@ import reactor.core.publisher.Mono
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class VesHvCollector(
+internal class HvVesCollector(
         private val clientContext: ClientContext,
         private val wireChunkDecoder: WireChunkDecoder,
         private val protobufDecoder: VesDecoder,
@@ -116,6 +116,6 @@ internal class VesHvCollector(
             filterFailedWithLog(logger, clientContext::fullMdc, predicate)
 
     companion object {
-        private val logger = Logger(VesHvCollector::class)
+        private val logger = Logger(HvVesCollector::class)
     }
 }
index 6e2e20f..b03b89e 100644 (file)
@@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing,
 
     fun route(message: VesMessage): Flux<ConsumedMessage> =
             routeFor(message.header)
-                    .fold({
-                        metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
-                        logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
-                        logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
-                        Flux.empty<Route>()
-                    }, {
-                        logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
-                        Flux.just(it)
-                    })
+                    .fold({ routeNotFound(message) }, { routeFound(message, it) })
                     .flatMap {
                         val sinkTopic = it.sink.topicName()
                         messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
                     }
 
+    private fun routeNotFound(message: VesMessage): Flux<Route> {
+        metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+        logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+        logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
+        return Flux.empty<Route>()
+    }
+
+    private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
+        logger.trace(ctx::fullMdc) {
+            "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
+        }
+        return Flux.just(route)
+    }
+
+
     private fun routeFor(header: CommonEventHeader) =
             routing.find { it.domain == header.domain }.toOption()
 
index 2ce0f42..7b726ab 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
index 7a49865..8698083 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.impl.createKafkaSender
@@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
 import reactor.kafka.sender.KafkaSender
 import java.util.Collections.synchronizedMap
 
@@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider {
         }
     }
 
-    override fun close() = IO {
-        messageSinks.values.forEach { it.close() }
-        logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
-    }
+    override fun close(): Mono<Void> =
+            Flux.fromIterable(messageSinks.values)
+                    .publishOn(Schedulers.elastic())
+                    .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close)
+                    .then()
+                    .doOnSuccess {
+                        logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
+                    }
 
     companion object {
         private val logger = Logger(KafkaSinkProvider::class)
index 3e19414..a208384 100644 (file)
@@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket
 
 import arrow.core.Option
 import arrow.core.getOrElse
-import arrow.effects.IO
 import io.netty.handler.ssl.SslContext
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
@@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
                               private val collectorProvider: CollectorProvider,
                               private val metrics: Metrics) : Server {
 
-    override fun start(): IO<ServerHandle> = IO {
-        TcpServer.create()
-                .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
-                .configureSsl()
-                .handle(this::handleConnection)
-                .doOnUnbound {
-                    logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
-                    collectorProvider.close().unsafeRunSync()
-                }
-                .let { NettyServerHandle(it.bindNow()) }
-    }
+    override fun start(): Mono<ServerHandle> =
+            Mono.defer {
+                TcpServer.create()
+                        .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
+                        .configureSsl()
+                        .handle(this::handleConnection)
+                        .bind()
+                        .map {
+                            NettyServerHandle(it, closeAction())
+                        }
+            }
+
+    private fun closeAction(): Mono<Void> =
+            collectorProvider.close().doOnSuccess {
+                logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+            }
+
 
     private fun TcpServer.configureSsl() =
             sslContext
@@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
     private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
             withNewClientContextFrom(nettyInbound, nettyOutbound)
             { clientContext ->
-                logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+                logger.debug(clientContext::fullMdc) { "Client connection request received" }
 
                 clientContext.clientAddress
                         .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
@@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
 
     private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
         metrics.notifyClientConnected()
-        logger.info(clientContext::fullMdc) { "Handling new client connection" }
+        logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
         val collector = collectorProvider(clientContext)
         return collector.handleClient(clientContext, nettyInbound)
     }
 
     private fun Collector.handleClient(clientContext: ClientContext,
-                             nettyInbound: NettyInbound) =
-        withConnectionFrom(nettyInbound) { connection ->
-            connection
-                    .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
-                    .logConnectionClosed(clientContext)
-        }.run {
-            handleConnection(nettyInbound.createDataStream())
-        }
+                                       nettyInbound: NettyInbound) =
+            withConnectionFrom(nettyInbound) { connection ->
+                connection
+                        .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
+                        .logConnectionClosed(clientContext)
+            }.run {
+                handleConnection(nettyInbound.createDataStream())
+            }
 
     private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
             onReadIdle(timeout.toMillis()) {
index b735138..ca9d28a 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.wire
 
-import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
 import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Flux.defer
 import reactor.core.publisher.SynchronousSink
@@ -63,26 +62,22 @@ internal class WireChunkDecoder(
     private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
         decoder.decodeFirst(streamBuffer)
                 .fold(onError(next), onSuccess(next))
-                .unsafeRunSync()
     }
 
-    private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+    private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err ->
         when (err) {
-            is InvalidWireFrame -> IO {
+            is InvalidWireFrame ->
                 next.error(WireFrameException(err))
-            }
-            is MissingWireFrameBytes -> IO {
+            is MissingWireFrameBytes -> {
                 logEndOfData()
                 next.complete()
             }
         }
     }
 
-    private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
-        IO {
-            logDecodedWireMessage(frame)
-            next.next(frame)
-        }
+    private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame ->
+        logDecodedWireMessage(frame)
+        next.next(frame)
     }
 
     private fun logIncomingMessage(wire: ByteBuf) {
index f79c2e4..95b9159 100644 (file)
@@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import org.onap.dcae.collectors.veshv.utils.Closeable
 import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import java.time.Duration
 import java.util.concurrent.atomic.AtomicBoolean
 
@@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
             if (sinkInitialized.get()) {
                 sink.close()
             } else {
-                IO.unit
+                Mono.empty()
             }
 }
 
index 2430c74..d845f7c 100644 (file)
@@ -67,7 +67,7 @@ object VesHvSpecification : Spek({
 
             // just connecting should not create sink
             sut.handleConnection()
-            sut.close().unsafeRunSync()
+            sut.close().block()
 
             // then
             assertThat(sink.closed).isFalse()
@@ -80,7 +80,7 @@ object VesHvSpecification : Spek({
             sut.handleConnection(vesWireFrameMessage(PERF3GPP))
 
             // when
-            sut.close().unsafeRunSync()
+            sut.close().block()
 
             // then
             assertThat(sink.closed).isTrue()
index 160defd..f1b1ba2 100644 (file)
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import java.time.Duration
 import java.util.*
 import java.util.concurrent.ConcurrentLinkedDeque
@@ -50,12 +51,9 @@ class StoringSink : Sink {
         return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
     }
 
-    /*
-    * TOD0: if the code would look like:
-    * ```IO { active.set(false) }```
-    * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
-    */
-    override fun close() = active.set(false).run { IO.unit }
+    override fun close(): Mono<Void> = Mono.fromRunnable {
+        active.set(false)
+    }
 }
 
 /**
index edbdaa3..57f21a6 100644 (file)
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-core</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects-instances</artifactId>
-        </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-syntax</artifactId>
index dc207ef..8b0a38b 100644 (file)
@@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.publisher.Mono
 import reactor.core.scheduler.Schedulers
+import java.time.Duration
 import java.util.concurrent.atomic.AtomicReference
 
 
@@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main")
 
 private val hvVesServer = AtomicReference<ServerHandle>()
 private val configurationModule = ConfigurationModule()
+private val maxCloseTime = Duration.ofSeconds(10)
 
 fun main(args: Array<String>) {
     val configStateListener = object : ConfigurationStateListener {
@@ -60,30 +63,36 @@ fun main(args: Array<String>) {
                 logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
                 HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
             }
-            .doOnNext(::startServer)
+            .flatMap(::startServer)
             .doOnError(::logServerStartFailed)
             .then()
             .block()
 }
 
-private fun startServer(config: HvVesConfiguration) {
-    stopRunningServer()
+private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
+        stopRunningServer()
+                .timeout(maxCloseTime)
+                .then(deferredVesServer(config))
+                .doOnNext {
+                    registerShutdownHook { shutdownGracefully(it) }
+                    hvVesServer.set(it)
+                }
+
+private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
     Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
     logger.debug(ServiceContext::mdc) { "Configuration: $config" }
-
-    VesServer.start(config).let {
-        registerShutdownHook { shutdownGracefully(it) }
-        hvVesServer.set(it)
-    }
+    VesServer.start(config)
 }
 
-private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
+private fun stopRunningServer() = Mono.defer {
+    hvVesServer.get()?.close() ?: Mono.empty()
+}
 
 internal fun shutdownGracefully(serverHandle: ServerHandle,
                                 healthState: HealthState = HealthState.INSTANCE) {
     logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
     healthState.changeState(HealthDescription.SHUTTING_DOWN)
-    serverHandle.close().unsafeRunSync()
+    serverHandle.close().block(maxCloseTime)
     logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
 }
 
index c079cc5..fc4d866 100644 (file)
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.arrow.then
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,11 +39,10 @@ object VesServer {
 
     private val logger = Logger(VesServer::class)
 
-    fun start(config: HvVesConfiguration): ServerHandle =
+    fun start(config: HvVesConfiguration): Mono<ServerHandle> =
             createVesServer(config)
                     .start()
-                    .then(::logServerStarted)
-                    .unsafeRunSync()
+                    .doOnNext(::logServerStarted)
 
     private fun createVesServer(config: HvVesConfiguration): Server =
             initializeCollectorFactory(config)
index 21c1fa3..539f7c2 100644 (file)
@@ -91,6 +91,7 @@
     </appender>
 
     <logger name="reactor.netty" level="WARN"/>
+    <logger name="reactor.netty.tcp.TcpServer" level="OFF"/>
     <logger name="io.netty" level="INFO"/>
     <logger name="io.netty.util" level="WARN"/>
     <logger name="org.apache.kafka" level="INFO"/>
index d8de9f2..a967fba 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.main
 
-import arrow.effects.IO
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.inOrder
 import com.nhaarman.mockitokotlin2.mock
@@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Mono
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
 internal object MainTest : Spek({
     describe("closeServer shutdown hook") {
         given("server handles and health state") {
-            val handle = mock<ServerHandle>()
+            val handle: ServerHandle = mock()
             var closed = false
-            val handleClose = IO {
-                closed = true
-            }
-            whenever(handle.close()).thenReturn(handleClose)
+            whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true })
             val healthState: HealthState = mock()
 
             on("shutdownGracefully") {
index 00b814c..ec654b3 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.utils
 
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since January 2019
  */
 interface Closeable {
-    fun close(): IO<Unit> = IO.unit
+    fun close(): Mono<Void> = Mono.empty()
 
     companion object {
         fun closeAll(closeables: Iterable<Closeable>) =
-                IO.monadError().binding {
-                    closeables.forEach { it.close().bind() }
-                }.fix()
+                Flux.fromIterable(closeables).flatMap(Closeable::close).then()
     }
 }
index 5b582ed..670ab4a 100644 (file)
@@ -20,8 +20,9 @@
 package org.onap.dcae.collectors.veshv.utils
 
 import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
 import reactor.netty.DisposableServer
-import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable {
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
-    override fun close() = IO {
-        ctx.disposeNow(SHUTDOWN_TIMEOUT)
-    }
+class NettyServerHandle(private val ctx: DisposableServer,
+                        private val closeAction: Mono<Void> = Mono.empty())
+    : ServerHandle(ctx.host(), ctx.port()) {
+
+    override fun close(): Mono<Void> =
+            Mono.just(ctx)
+                    .filter { !it.isDisposed }
+                    .flatMap {
+                        closeAction.thenReturn(it)
+                    }
+                    .then(dispose())
+
+    private fun dispose(): Mono<Void> =
+            Mono.create { callback ->
+                logger.debug { "About to dispose NettyServer" }
+                ctx.dispose()
+                ctx.onDispose {
+                    logger.debug { "Netty server disposed" }
+                    callback.success()
+                }
+            }
 
     override fun await() = IO<Unit> {
         ctx.channel().closeFuture().sync()
     }
 
     companion object {
-        private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
+        private val logger = Logger(NettyServerHandle::class)
     }
 }