Closing KafkaSender should result in flushing any pending messages.
Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc
Issue-ID: DCAEGEN2-1065
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.Closeable
import reactor.core.publisher.Flux
interface Sink {
fun notifyClientRejected(cause: ClientRejectionCause)
}
-interface SinkProvider {
+interface SinkProvider: Closeable {
operator fun invoke(ctx: ClientContext): Sink
-
- companion object {
- fun just(sink: Sink): SinkProvider =
- object : SinkProvider {
- override fun invoke(
- ctx: ClientContext): Sink = sink
- }
- }
}
interface ConfigurationProvider {
import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import java.util.*
interface Collector {
fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = (ClientContext) -> Option<Collector>
+interface CollectorProvider : Closeable {
+ operator fun invoke(ctx: ClientContext): Option<Collector>
+}
interface Server {
fun start(): IO<ServerHandle>
*/
package org.onap.dcae.collectors.veshv.factory
+import arrow.core.Option
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
}
.subscribe(config::set)
- return { ctx: ClientContext ->
- config.getOption().map { createVesHvCollector(it, ctx) }
+ return object : CollectorProvider {
+ override fun invoke(ctx: ClientContext): Option<Collector> =
+ config.getOption().map { createVesHvCollector(it, ctx) }
+
+ override fun close() = sinkProvider.close()
}
}
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+import arrow.effects.IO
import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+ override fun close() = IO {
+ kafkaSender.close()
+ logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+ }
+
companion object {
+ private val logger = Logger(KafkaSinkProvider::class)
private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
private const val BUFFER_MEMORY_MULTIPLIER = 32
private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
.addressSupplier { serverConfig.serverListenAddress }
.configureSsl()
.handle(this::handleConnection)
+ .doOnUnbound {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ collectorProvider.close().unsafeRunSync()
+ }
.let { NettyServerHandle(it.bindNow()) }
}
*/
object ServiceContext {
val instanceId = UUID.randomUUID().toString()
- val serverFqdn = getHost().hostName
+ val serverFqdn = getHost().hostName!!
val mdc = mapOf(
OnapMdc.INSTANCE_ID to instanceId,
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import arrow.syntax.collections.tail
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass
+import reactor.kafka.sender.KafkaSender
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
}
}
}
+
+ given("dummy KafkaSender") {
+ val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
+ val cut = KafkaSinkProvider(kafkaSender)
+
+ on("close") {
+ cut.close().unsafeRunSync()
+
+ it("should close KafkaSender") {
+ verify(kafkaSender).close()
+ }
+ }
+ }
}
})
package org.onap.dcae.collectors.veshv.tests.component
import arrow.core.getOrElse
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.tests.fakes.*
import reactor.core.publisher.Flux
import java.time.Duration
+import java.util.concurrent.atomic.AtomicBoolean
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) {
+class Sut(sink: Sink = StoringSink()): AutoCloseable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
+ val sinkProvider = DummySinkProvider(sink)
private val collectorFactory = CollectorFactory(
configurationProvider,
- SinkProvider.just(sink),
+ sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES,
healthStateProvider)
throw IllegalStateException("Collector not available.")
}
+ override fun close() {
+ collectorProvider.close().unsafeRunSync()
+ }
+
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
}
+class DummySinkProvider(private val sink: Sink) : SinkProvider {
+ private val active = AtomicBoolean(true)
+
+ override fun invoke(ctx: ClientContext) = sink
+
+ override fun close() = IO {
+ active.set(false)
+ }
+
+ val closed get() = !active.get()
+
+}
+
private val timeout = Duration.ofSeconds(10)
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
.describedAs("should send all events")
.hasSize(2)
}
+
+ it("should close sink when closing collector provider") {
+ val (sut, _) = vesHvWithStoringSink()
+
+ sut.close()
+
+ assertThat(sut.sinkProvider.closed).isTrue()
+ }
}
describe("Memory management") {
*/
package org.onap.dcae.collectors.veshv.tests.fakes
-import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.netty.DisposableServer
import reactor.netty.http.server.HttpServer
import reactor.netty.http.server.HttpServerRequest
import reactor.netty.http.server.HttpServerResponse
fun start(): IO<ServerHandle> = IO {
healthState().subscribe(healthDescription::set)
val ctx = HttpServer.create()
- .tcpConfiguration { it.addressSupplier { listenAddress } }
+ .tcpConfiguration {
+ it.addressSupplier { listenAddress }
+ .doOnUnbound { logClose() }
+ }
.route { routes ->
routes.get("/health/ready", ::readinessHandler)
routes.get("/health/alive", ::livenessHandler)
private fun monitoringHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
resp.sendString(monitoring.lastStatus())
+ private fun logClose() {
+ logger.info { "Health Check API closed" }
+ }
+
companion object {
private val logger = Logger(HealthCheckApiServer::class)
+
}
}
import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
fun main(args: Array<String>) =
- ArgVesHvConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startAndAwaitServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- { logger.info { "Finished" } }
- )
+ ArgVesHvConfiguration().parse(args)
+ .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map(::startAndAwaitServers)
+ .unsafeRunEitherSync(
+ { ex ->
+ logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
+ ExitFailure(1)
+ },
+ { logger.debug(ServiceContext::mdc) { "Finished" } }
+ )
private fun startAndAwaitServers(config: ServerConfiguration) =
- IO.monad().binding {
- Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
- logger.info { "Using configuration: $config" }
- HealthCheckServer.start(config).bind()
- VesServer.start(config).bind().run {
- registerShutdownHook(shutdown()).bind()
- await().bind()
+ IO.monad().binding {
+ Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
+ logger.info { "Using configuration: $config" }
+ val healthCheckServerHandle = HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind().let { handle ->
+ registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind()
+ handle.await().bind()
+ }
+ }.fix()
+
+private fun closeServers(vararg handles: ServerHandle): IO<Unit> =
+ Closeable.closeAll(handles.asIterable()).then {
+ logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
- }.fix()
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
abstract class ServerStarter {
fun start(config: ServerConfiguration): IO<ServerHandle> =
startServer(config)
- .map { logger.info { serverStartedMessage(it) }; it }
+ .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } }
protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
protected abstract fun serverStartedMessage(handle: ServerHandle): String
</appender>
<logger name="reactor.netty" level="WARN"/>
- <logger name="io.netty" level="DEBUG"/>
+ <logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
- <logger name="org.apache.kafka" level="WARN"/>
+ <logger name="org.apache.kafka" level="INFO"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-effects</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-instances</artifactId>
+ </dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since January 2019
+ */
+interface Closeable {
+ fun close(): IO<Unit> = IO.unit
+
+ companion object {
+ fun closeAll(closeables: Iterable<Closeable>) =
+ IO.monadError().binding {
+ closeables.forEach { it.close().bind() }
+ }.fix()
+ }
+}
import arrow.core.Left
import arrow.core.Right
import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.system.exitProcess
data class ExitFailure(override val code: Int) : ExitCode()
-fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
+inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
fun IO<Any>.unit() = map { Unit }
{ Flux.just<T>(it) }
)
}
+
+inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> =
+ map {
+ block(it)
+ it
+ }
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.netty.DisposableServer
import java.time.Duration
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-abstract class ServerHandle(val host: String, val port: Int) {
- abstract fun shutdown(): IO<Unit>
+abstract class ServerHandle(val host: String, val port: Int) : Closeable {
abstract fun await(): IO<Unit>
}
* @since August 2018
*/
class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
- override fun shutdown() = IO {
- logger.info { "Graceful shutdown" }
+ override fun close() = IO {
ctx.disposeNow(SHUTDOWN_TIMEOUT)
- logger.info { "Server disposed" }
}
override fun await() = IO<Unit> {
}
companion object {
- val logger = Logger(NettyServerHandle::class)
private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
}
}