}
logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
- return collectorProvider(clientContext).fold(
+ messageHandlingStream(clientContext, nettyInbound).subscribe()
+ return nettyOutbound.neverComplete()
+ }
+
+ private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> =
+ collectorProvider(clientContext).fold(
{
logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
Mono.empty()
it.handleConnection(createDataStream(nettyInbound))
}
)
- }
private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
clientContext.clientAddress = try {
import arrow.core.Option
import arrow.core.getOrElse
import io.netty.handler.ssl.SslContext
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ClientSslContextFactory
import org.onap.dcae.collectors.veshv.utils.arrow.asIo