fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
-interface SinkProvider : Closeable {
+interface SinkFactory : Closeable {
operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
}
fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-interface CollectorProvider : Closeable {
+interface CollectorFactory : Closeable {
operator fun invoke(ctx: ClientContext): Collector
}
*/
package org.onap.dcae.collectors.veshv.factory
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkFactory
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
+ fun sinkCreatorFactory(): SinkFactory = KafkaSinkFactory()
}
package org.onap.dcae.collectors.veshv.factory
import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(private val configuration: CollectorConfiguration,
- private val sinkProvider: SinkProvider,
- private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int) {
+class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
+ private val sinkFactory: SinkFactory,
+ private val metrics: Metrics,
+ private val maxPayloadSizeBytes: Int): CollectorFactory {
- fun createVesHvCollectorProvider(): CollectorProvider {
+ override fun invoke(ctx: ClientContext): Collector =
+ createVesHvCollector(ctx)
- return object : CollectorProvider {
- override fun invoke(ctx: ClientContext): Collector =
- createVesHvCollector(ctx)
-
- override fun close() = sinkProvider.close()
- }
- }
+ override fun close() = sinkFactory.close()
private fun createVesHvCollector(ctx: ClientContext): Collector =
HvVesCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(configuration.routing, sinkProvider, ctx, metrics),
+ router = Router(configuration.routing, sinkFactory, ctx, metrics),
metrics = metrics)
-
- companion object {
- private val logger = Logger(CollectorFactory::class)
- }
}
*/
package org.onap.dcae.collectors.veshv.factory
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
fun createNettyTcpServer(serverConfig: ServerConfiguration,
securityConfig: SecurityConfiguration,
- collectorProvider: CollectorProvider,
+ collectorFactory: CollectorFactory,
metrics: Metrics
): Server = NettyTcpServer(
serverConfig,
sslFactory.createServerContext(securityConfig),
- collectorProvider,
+ collectorFactory,
metrics
)
}
import arrow.core.toOption
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.model.ClientContext
private val ctx: ClientContext,
private val metrics: Metrics) {
constructor(routing: Routing,
- sinkProvider: SinkProvider,
+ sinkFactory: SinkFactory,
ctx: ClientContext,
metrics: Metrics) :
this(routing,
- constructMessageSinks(routing, sinkProvider, ctx),
+ constructMessageSinks(routing, sinkFactory, ctx),
ctx,
metrics) {
logger.debug(ctx::mdc) { "Routing for client: $routing" }
private val NONE_PARTITION = None
internal fun constructMessageSinks(routing: Routing,
- sinkProvider: SinkProvider,
+ sinkFactory: SinkFactory,
ctx: ClientContext) =
routing.map(Route::sink)
.distinctBy { it.topicName() }
- .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
+ .associateBy({ it.topicName() }, { sinkFactory(it, ctx) })
}
private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
import org.onap.dcae.collectors.veshv.model.ClientContext
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider : SinkProvider {
+internal class KafkaSinkFactory : SinkFactory {
private val messageSinks = synchronizedMap(
mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
)
}
companion object {
- private val logger = Logger(KafkaSinkProvider::class)
+ private val logger = Logger(KafkaSinkFactory::class)
}
}
import arrow.core.getOrElse
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
*/
internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
private val sslContext: Option<SslContext>,
- private val collectorProvider: CollectorProvider,
+ private val collectorFactory: CollectorFactory,
private val metrics: Metrics) : Server {
override fun start(): Mono<ServerHandle> =
}
private fun closeAction(): Mono<Void> =
- collectorProvider.close().doOnSuccess {
+ collectorFactory.close().doOnSuccess {
logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
}
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
- val collector = collectorProvider(clientContext)
+ val collector = collectorFactory(clientContext)
return collector.handleClient(clientContext, nettyInbound)
}
*/
package org.onap.dcae.collectors.veshv.tests.component
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable {
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
- val sinkProvider = DummySinkProvider(sink)
+ val sinkProvider = DummySinkFactory(sink)
- private val collectorFactory = CollectorFactory(
+ private val collectorProvider = HvVesCollectorFactory(
configuration,
sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES
)
- private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
-
val collector: Collector
get() = collectorProvider(ClientContext(alloc))
}
}
-class DummySinkProvider(private val sink: Sink) : SinkProvider {
+class DummySinkFactory(private val sink: Sink) : SinkFactory {
private val sinkInitialized = AtomicBoolean(false)
override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.factory.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
.doOnNext(::logServerStarted)
private fun createVesServer(config: HvVesConfiguration): Server =
- initializeCollectorFactory(config)
- .createVesHvCollectorProvider()
+ createCollectorProvider(config)
.let { collectorProvider ->
ServerFactory.createNettyTcpServer(
config.server,
)
}
- private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
- CollectorFactory(
+ private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory =
+ HvVesCollectorFactory(
config.collector,
AdapterFactory.sinkCreatorFactory(),
MicrometerMetrics.INSTANCE,