-version: "3.4"
+version: "3.5"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "6060:6060"
- "6061:6061/tcp"
+ entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
+ "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
"--config-url", "http://consul:8500/v1/kv/veshv-config"]
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.SynchronousSink
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
- .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .doFinally { releaseBuffersMemory(wireDecoder) }
.onErrorResume(::handleErrors)
.then()
}
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
+import reactor.ipc.netty.ByteBufFlux
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
opts.port(serverConfig.listenPort)
}
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
- logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+ private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
+ it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ }
+ )
- val dataStream = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- .receive()
- .retain()
- return collectorProvider().fold(
- {
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
- Mono.empty()
- },
- { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
-
- }
+ fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
+ .configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ .receive()
+ .retain()
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {