Change-Id: I0b6cd5023b2bca0f0bee6958c107fc560fc95b52
Issue-ID: DCAEGEN2-751
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
*/
package org.onap.dcae.collectors.veshv.boundary
*/
package org.onap.dcae.collectors.veshv.boundary
+import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
}
fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = () -> Collector
+typealias CollectorProvider = () -> Option<Collector>
interface Server {
fun start(): IO<ServerHandle>
interface Server {
fun start(): IO<ServerHandle>
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
+ return collector::getOption
}
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
}
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
- return collectorProvider()
- .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+ return collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
+
}
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {
}
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {
- logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
- context().channel().close().addListener {
- if (it.isSuccess)
- logger.debug { "Client disconnected because of idle timeout" }
- else
- logger.warn("Channel close failed", it.cause())
+ logger.info {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
+ private fun NettyInbound.disconnectClient() {
+ context().channel().close().addListener {
+ if (it.isSuccess)
+ logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+
private fun NettyInbound.logConnectionClosed(): NettyInbound {
context().onClose {
logger.info("Connection from ${remoteAddress()} has been closed")
private fun NettyInbound.logConnectionClosed(): NettyInbound {
context().onClose {
logger.info("Connection from ${remoteAddress()} has been closed")
*/
package org.onap.dcae.collectors.veshv.tests.component
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.getOrElse
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider()
+ get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {