Close KafkaSender when handling SIGINT 16/76116/5
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 22 Jan 2019 10:43:18 +0000 (11:43 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 22 Jan 2019 13:30:32 +0000 (14:30 +0100)
Closing KafkaSender should result in flushing any pending messages.

Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc
Issue-ID: DCAEGEN2-1065
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
18 files changed:
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
sources/hv-collector-main/src/main/resources/logback.xml
sources/hv-collector-utils/pom.xml
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt [new file with mode: 0644]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt

index e4a7394..6a6e73f 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.Closeable
 import reactor.core.publisher.Flux
 
 interface Sink {
@@ -42,16 +43,8 @@ interface Metrics {
     fun notifyClientRejected(cause: ClientRejectionCause)
 }
 
-interface SinkProvider {
+interface SinkProvider: Closeable {
     operator fun invoke(ctx: ClientContext): Sink
-
-    companion object {
-        fun just(sink: Sink): SinkProvider =
-                object : SinkProvider {
-                    override fun invoke(
-                            ctx: ClientContext): Sink = sink
-                }
-    }
 }
 
 interface ConfigurationProvider {
index 5584d61..5c64c70 100644 (file)
@@ -22,18 +22,19 @@ package org.onap.dcae.collectors.veshv.boundary
 import arrow.core.Option
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import java.util.*
 
 interface Collector {
     fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
 }
 
-typealias CollectorProvider = (ClientContext) -> Option<Collector>
+interface CollectorProvider : Closeable {
+    operator fun invoke(ctx: ClientContext): Option<Collector>
+}
 
 interface Server {
     fun start(): IO<ServerHandle>
index 861065c..535d1ba 100644 (file)
@@ -19,6 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.factory
 
+import arrow.core.Option
+import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
@@ -60,8 +62,11 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                 }
                 .subscribe(config::set)
 
-        return { ctx: ClientContext ->
-            config.getOption().map { createVesHvCollector(it, ctx) }
+        return object : CollectorProvider {
+            override fun invoke(ctx: ClientContext): Option<Collector> =
+                config.getOption().map { createVesHvCollector(it, ctx) }
+
+            override fun close() = sinkProvider.close()
         }
     }
 
index 5e7d9f5..aa76ce3 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
+import arrow.effects.IO
 import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
 import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
 import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
@@ -30,7 +31,9 @@ import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderOptions
@@ -47,7 +50,13 @@ internal class KafkaSinkProvider internal constructor(
 
     override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
 
+    override fun close() = IO {
+        kafkaSender.close()
+        logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+    }
+
     companion object {
+        private val logger = Logger(KafkaSinkProvider::class)
         private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
         private const val BUFFER_MEMORY_MULTIPLIER = 32
         private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
index 725622f..c76233f 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -63,6 +64,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                 .addressSupplier { serverConfig.serverListenAddress }
                 .configureSsl()
                 .handle(this::handleConnection)
+                .doOnUnbound {
+                    logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+                    collectorProvider.close().unsafeRunSync()
+                }
                 .let { NettyServerHandle(it.bindNow()) }
     }
 
index 2407ece..a72ec03 100644 (file)
@@ -30,7 +30,7 @@ import java.util.*
  */
 object ServiceContext {
     val instanceId = UUID.randomUUID().toString()
-    val serverFqdn = getHost().hostName
+    val serverFqdn = getHost().hostName!!
 
     val mdc = mapOf(
             OnapMdc.INSTANCE_ID to instanceId,
index f23154a..2db6a15 100644 (file)
@@ -20,6 +20,8 @@
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import arrow.syntax.collections.tail
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
@@ -28,6 +30,9 @@ import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass
+import reactor.kafka.sender.KafkaSender
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -61,5 +66,18 @@ internal object KafkaSinkProviderTest : Spek({
                 }
             }
         }
+
+        given("dummy KafkaSender") {
+            val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
+            val cut = KafkaSinkProvider(kafkaSender)
+
+            on("close") {
+                cut.close().unsafeRunSync()
+
+                it("should close KafkaSender") {
+                    verify(kafkaSender).close()
+                }
+            }
+        }
     }
 })
index c3e4a58..30661e8 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.tests.component
 
 import arrow.core.getOrElse
+import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.UnpooledByteBufAllocator
@@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.*
 import reactor.core.publisher.Flux
 import java.time.Duration
+import java.util.concurrent.atomic.AtomicBoolean
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class Sut(sink: Sink = StoringSink()) {
+class Sut(sink: Sink = StoringSink()): AutoCloseable {
     val configurationProvider = FakeConfigurationProvider()
     val healthStateProvider = FakeHealthState()
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
     val metrics = FakeMetrics()
+    val sinkProvider = DummySinkProvider(sink)
 
     private val collectorFactory = CollectorFactory(
             configurationProvider,
-            SinkProvider.just(sink),
+            sinkProvider,
             metrics,
             MAX_PAYLOAD_SIZE_BYTES,
             healthStateProvider)
@@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) {
             throw IllegalStateException("Collector not available.")
         }
 
+    override fun close() {
+        collectorProvider.close().unsafeRunSync()
+    }
+
     companion object {
         const val MAX_PAYLOAD_SIZE_BYTES = 1024
     }
 }
 
+class DummySinkProvider(private val sink: Sink) : SinkProvider {
+    private val active = AtomicBoolean(true)
+
+    override fun invoke(ctx: ClientContext) = sink
+
+    override fun close() = IO {
+        active.set(false)
+    }
+
+    val closed get() = !active.get()
+
+}
+
 private val timeout = Duration.ofSeconds(10)
 
 fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
index 75e7cf0..ed46b11 100644 (file)
@@ -61,6 +61,14 @@ object VesHvSpecification : Spek({
                     .describedAs("should send all events")
                     .hasSize(2)
         }
+
+        it("should close sink when closing collector provider") {
+            val (sut, _) = vesHvWithStoringSink()
+
+            sut.close()
+
+            assertThat(sut.sinkProvider.closed).isTrue()
+        }
     }
 
     describe("Memory management") {
index b4ce649..2e7065b 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.tests.fakes
 
-import arrow.core.identity
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
index 7aade34..3248600 100644 (file)
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import reactor.netty.DisposableServer
 import reactor.netty.http.server.HttpServer
 import reactor.netty.http.server.HttpServerRequest
 import reactor.netty.http.server.HttpServerResponse
@@ -48,7 +49,10 @@ class HealthCheckApiServer(private val healthState: HealthState,
     fun start(): IO<ServerHandle> = IO {
         healthState().subscribe(healthDescription::set)
         val ctx = HttpServer.create()
-                .tcpConfiguration { it.addressSupplier { listenAddress } }
+                .tcpConfiguration {
+                    it.addressSupplier { listenAddress }
+                            .doOnUnbound { logClose() }
+                }
                 .route { routes ->
                     routes.get("/health/ready", ::readinessHandler)
                     routes.get("/health/alive", ::livenessHandler)
@@ -70,8 +74,13 @@ class HealthCheckApiServer(private val healthState: HealthState,
     private fun monitoringHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
             resp.sendString(monitoring.lastStatus())
 
+    private fun logClose() {
+        logger.info { "Health Check API closed" }
+    }
+
     companion object {
         private val logger = Logger(HealthCheckApiServer::class)
+
     }
 
 }
index 16da372..d865bcf 100644 (file)
@@ -26,7 +26,11 @@ import arrow.typeclasses.binding
 import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
 import org.onap.dcae.collectors.veshv.main.servers.VesServer
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.then
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -37,24 +41,29 @@ private val logger = Logger("$VESHV_PACKAGE.main")
 private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
 
 fun main(args: Array<String>) =
-    ArgVesHvConfiguration().parse(args)
-        .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-        .map(::startAndAwaitServers)
-        .unsafeRunEitherSync(
-            { ex ->
-                logger.withError { log("Failed to start a server", ex) }
-                ExitFailure(1)
-            },
-            { logger.info { "Finished" } }
-        )
+        ArgVesHvConfiguration().parse(args)
+                .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+                .map(::startAndAwaitServers)
+                .unsafeRunEitherSync(
+                        { ex ->
+                            logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
+                            ExitFailure(1)
+                        },
+                        { logger.debug(ServiceContext::mdc) { "Finished" } }
+                )
 
 private fun startAndAwaitServers(config: ServerConfiguration) =
-    IO.monad().binding {
-        Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
-        logger.info { "Using configuration: $config" }
-        HealthCheckServer.start(config).bind()
-        VesServer.start(config).bind().run {
-            registerShutdownHook(shutdown()).bind()
-            await().bind()
+        IO.monad().binding {
+            Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
+            logger.info { "Using configuration: $config" }
+            val healthCheckServerHandle = HealthCheckServer.start(config).bind()
+            VesServer.start(config).bind().let { handle ->
+                registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind()
+                handle.await().bind()
+            }
+        }.fix()
+
+private fun closeServers(vararg handles: ServerHandle): IO<Unit> =
+        Closeable.closeAll(handles.asIterable()).then {
+            logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
         }
-    }.fix()
index 13b0bc7..3d1a2a2 100644 (file)
@@ -21,7 +21,9 @@ package org.onap.dcae.collectors.veshv.main.servers
 
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
 /**
@@ -31,7 +33,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 abstract class ServerStarter {
     fun start(config: ServerConfiguration): IO<ServerHandle> =
             startServer(config)
-                    .map { logger.info { serverStartedMessage(it) }; it }
+                    .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } }
 
     protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
     protected abstract fun serverStartedMessage(handle: ServerHandle): String
index 5ce3480..40f3c8a 100644 (file)
@@ -90,9 +90,9 @@
     </appender>
 
     <logger name="reactor.netty" level="WARN"/>
-    <logger name="io.netty" level="DEBUG"/>
+    <logger name="io.netty" level="INFO"/>
     <logger name="io.netty.util" level="WARN"/>
-    <logger name="org.apache.kafka" level="WARN"/>
+    <logger name="org.apache.kafka" level="INFO"/>
 
     <root level="INFO">
         <appender-ref ref="CONSOLE"/>
index 5bd2472..6ce74bd 100644 (file)
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-effects</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-effects-instances</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-syntax</artifactId>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
new file mode 100644 (file)
index 0000000..00b814c
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since January 2019
+ */
+interface Closeable {
+    fun close(): IO<Unit> = IO.unit
+
+    companion object {
+        fun closeAll(closeables: Iterable<Closeable>) =
+                IO.monadError().binding {
+                    closeables.forEach { it.close().bind() }
+                }.fix()
+    }
+}
index 3c2c64a..290ef72 100644 (file)
@@ -23,6 +23,9 @@ import arrow.core.Either
 import arrow.core.Left
 import arrow.core.Right
 import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import kotlin.system.exitProcess
@@ -46,7 +49,7 @@ object ExitSuccess : ExitCode() {
 
 data class ExitFailure(override val code: Int) : ExitCode()
 
-fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
+inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
         flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
 
 fun IO<Any>.unit() = map { Unit }
@@ -66,3 +69,9 @@ fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
                     { Flux.just<T>(it) }
             )
         }
+
+inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> =
+        map {
+            block(it)
+            it
+        }
index b8784c6..5b582ed 100644 (file)
@@ -20,7 +20,6 @@
 package org.onap.dcae.collectors.veshv.utils
 
 import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.netty.DisposableServer
 import java.time.Duration
 
@@ -28,8 +27,7 @@ import java.time.Duration
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-abstract class ServerHandle(val host: String, val port: Int) {
-    abstract fun shutdown(): IO<Unit>
+abstract class ServerHandle(val host: String, val port: Int) : Closeable {
     abstract fun await(): IO<Unit>
 }
 
@@ -38,10 +36,8 @@ abstract class ServerHandle(val host: String, val port: Int) {
  * @since August 2018
  */
 class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
-    override fun shutdown() = IO {
-        logger.info { "Graceful shutdown" }
+    override fun close() = IO {
         ctx.disposeNow(SHUTDOWN_TIMEOUT)
-        logger.info { "Server disposed" }
     }
 
     override fun await() = IO<Unit> {
@@ -49,7 +45,6 @@ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.ho
     }
 
     companion object {
-        val logger = Logger(NettyServerHandle::class)
         private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
     }
 }