*/
package org.onap.dcae.collectors.veshv.utils
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
abstract class ServerHandle(val host: String, val port: Int) : Closeable {
- abstract fun await(): IO<Unit>
+ abstract fun await(): Mono<Void>
}
/**
.then(dispose())
private fun dispose(): Mono<Void> =
- Mono.create { callback ->
+ Mono.create<Int> { callback ->
logger.debug { "About to dispose NettyServer" }
ctx.dispose()
ctx.onDispose {
logger.debug { "Netty server disposed" }
- callback.success()
+ callback.success(1)
}
}
+ .delayElement(boundPortReleaseLatency)
+ .then()
- override fun await() = IO<Unit> {
- ctx.channel().closeFuture().sync()
+ override fun await(): Mono<Void> = Mono.create { callback ->
+ ctx.channel().closeFuture().addListener {
+ callback.success()
+ }
}
companion object {
private val logger = Logger(NettyServerHandle::class)
+ private val boundPortReleaseLatency = Duration.ofSeconds(1)
}
}