X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=sources%2Fhv-collector-utils%2Fsrc%2Fmain%2Fkotlin%2Forg%2Fonap%2Fdcae%2Fcollectors%2Fveshv%2Futils%2Fserver_handle.kt;h=9c0a6e1c1e7a50551bb1466b5654aada107323c3;hb=HEAD;hp=bdb63b68ee75aa7d01df2fbf0028200697414cc1;hpb=dde383a2aa75f94c26d7949665b79cc95486a223;p=dcaegen2%2Fcollectors%2Fhv-ves.git diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index bdb63b68..9c0a6e1c 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -19,28 +19,55 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import reactor.netty.DisposableServer +import java.time.Duration /** * @author Piotr Jaszczyk * @since August 2018 */ -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO - abstract fun await(): IO +abstract class ServerHandle(val host: String, val port: Int) : Closeable { + abstract fun await(): Mono } /** * @author Piotr Jaszczyk * @since August 2018 */ -class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun shutdown() = IO { - ctx.disposeNow() +class NettyServerHandle(private val ctx: DisposableServer, + private val closeAction: Mono = Mono.empty()) + : ServerHandle(ctx.host(), ctx.port()) { + + override fun close(): Mono = + Mono.just(ctx) + .filter { !it.isDisposed } + .flatMap { + closeAction.thenReturn(it) + } + .then(dispose()) + + private fun dispose(): Mono = + Mono.create { callback -> + logger.debug { "About to dispose NettyServer" } + ctx.dispose() + ctx.onDispose { + logger.debug { "Netty server disposed" } + callback.success(1) + } + } + .delayElement(boundPortReleaseLatency) + .then() + + override fun await(): Mono = Mono.create { callback -> + ctx.channel().closeFuture().addListener { + callback.success() + } } - override fun await() = IO { - ctx.channel().closeFuture().sync() + companion object { + private val logger = Logger(NettyServerHandle::class) + private val boundPortReleaseLatency = Duration.ofSeconds(1) } }