Simplify factory/provider logic 78/84078/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 3 Apr 2019 10:12:17 +0000 (12:12 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 3 Apr 2019 10:12:17 +0000 (12:12 +0200)
Change-Id: I59467c41e1de63ead7c190a7c8fd688e3216237a
Issue-ID: DCAEGEN2-1385
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt [moved from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt with 68% similarity]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt [moved from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt with 93% similarity]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt

index e3156a0..48f335a 100644 (file)
@@ -35,7 +35,7 @@ interface Sink : Closeable {
     fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
 }
 
-interface SinkProvider : Closeable {
+interface SinkFactory : Closeable {
     operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
 }
 
index 0039ef6..4c54d7d 100644 (file)
@@ -30,7 +30,7 @@ interface Collector {
     fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
 }
 
-interface CollectorProvider : Closeable {
+interface CollectorFactory : Closeable {
     operator fun invoke(ctx: ClientContext): Collector
 }
 
index 04e575a..70f61b6 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.factory
 
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkFactory
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
 object AdapterFactory {
-    fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
+    fun sinkCreatorFactory(): SinkFactory = KafkaSinkFactory()
 }
 package org.onap.dcae.collectors.veshv.factory
 
 import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
 import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.HvVesCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class CollectorFactory(private val configuration: CollectorConfiguration,
-                       private val sinkProvider: SinkProvider,
-                       private val metrics: Metrics,
-                       private val maxPayloadSizeBytes: Int) {
+class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
+                            private val sinkFactory: SinkFactory,
+                            private val metrics: Metrics,
+                            private val maxPayloadSizeBytes: Int): CollectorFactory {
 
-    fun createVesHvCollectorProvider(): CollectorProvider {
+    override fun invoke(ctx: ClientContext): Collector =
+            createVesHvCollector(ctx)
 
-        return object : CollectorProvider {
-            override fun invoke(ctx: ClientContext): Collector =
-                    createVesHvCollector(ctx)
-
-            override fun close() = sinkProvider.close()
-        }
-    }
+    override fun close() = sinkFactory.close()
 
     private fun createVesHvCollector(ctx: ClientContext): Collector =
             HvVesCollector(
                     clientContext = ctx,
                     wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
                     protobufDecoder = VesDecoder(),
-                    router = Router(configuration.routing, sinkProvider, ctx, metrics),
+                    router = Router(configuration.routing, sinkFactory, ctx, metrics),
                     metrics = metrics)
-
-    companion object {
-        private val logger = Logger(CollectorFactory::class)
-    }
 }
index 6c4e467..e0f611b 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.factory
 
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
@@ -37,12 +37,12 @@ object ServerFactory {
 
     fun createNettyTcpServer(serverConfig: ServerConfiguration,
                              securityConfig: SecurityConfiguration,
-                             collectorProvider: CollectorProvider,
+                             collectorFactory: CollectorFactory,
                              metrics: Metrics
     ): Server = NettyTcpServer(
             serverConfig,
             sslFactory.createServerContext(securityConfig),
-            collectorProvider,
+            collectorFactory,
             metrics
     )
 }
index b03b89e..fe34a9c 100644 (file)
@@ -23,7 +23,7 @@ import arrow.core.None
 import arrow.core.toOption
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
 import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,11 +40,11 @@ class Router internal constructor(private val routing: Routing,
                                   private val ctx: ClientContext,
                                   private val metrics: Metrics) {
     constructor(routing: Routing,
-                sinkProvider: SinkProvider,
+                sinkFactory: SinkFactory,
                 ctx: ClientContext,
                 metrics: Metrics) :
             this(routing,
-                    constructMessageSinks(routing, sinkProvider, ctx),
+                    constructMessageSinks(routing, sinkFactory, ctx),
                     ctx,
                     metrics) {
         logger.debug(ctx::mdc) { "Routing for client: $routing" }
@@ -87,11 +87,11 @@ class Router internal constructor(private val routing: Routing,
         private val NONE_PARTITION = None
 
         internal fun constructMessageSinks(routing: Routing,
-                                           sinkProvider: SinkProvider,
+                                           sinkFactory: SinkFactory,
                                            ctx: ClientContext) =
                 routing.map(Route::sink)
                         .distinctBy { it.topicName() }
-                        .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
+                        .associateBy({ it.topicName() }, { sinkFactory(it, ctx) })
     }
 
     private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.impl.createKafkaSender
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -37,7 +37,7 @@ import java.util.Collections.synchronizedMap
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-internal class KafkaSinkProvider : SinkProvider {
+internal class KafkaSinkFactory : SinkFactory {
     private val messageSinks = synchronizedMap(
             mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
     )
@@ -58,6 +58,6 @@ internal class KafkaSinkProvider : SinkProvider {
                     }
 
     companion object {
-        private val logger = Logger(KafkaSinkProvider::class)
+        private val logger = Logger(KafkaSinkFactory::class)
     }
 }
index a208384..7ce86f9 100644 (file)
@@ -23,7 +23,7 @@ import arrow.core.Option
 import arrow.core.getOrElse
 import io.netty.handler.ssl.SslContext
 import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
@@ -51,7 +51,7 @@ import java.time.Duration
  */
 internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
                               private val sslContext: Option<SslContext>,
-                              private val collectorProvider: CollectorProvider,
+                              private val collectorFactory: CollectorFactory,
                               private val metrics: Metrics) : Server {
 
     override fun start(): Mono<ServerHandle> =
@@ -67,7 +67,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
             }
 
     private fun closeAction(): Mono<Void> =
-            collectorProvider.close().doOnSuccess {
+            collectorFactory.close().doOnSuccess {
                 logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
             }
 
@@ -118,7 +118,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
     private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
         metrics.notifyClientConnected()
         logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
-        val collector = collectorProvider(clientContext)
+        val collector = collectorFactory(clientContext)
         return collector.handleClient(clientContext, nettyInbound)
     }
 
index 95b9159..8b2bc13 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.tests.component
 
-import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.UnpooledByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
 import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
@@ -51,17 +50,15 @@ import java.util.concurrent.atomic.AtomicBoolean
 class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable {
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
     val metrics = FakeMetrics()
-    val sinkProvider = DummySinkProvider(sink)
+    val sinkProvider = DummySinkFactory(sink)
 
-    private val collectorFactory = CollectorFactory(
+    private val collectorProvider = HvVesCollectorFactory(
             configuration,
             sinkProvider,
             metrics,
             MAX_PAYLOAD_SIZE_BYTES
     )
 
-    private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
-
     val collector: Collector
         get() = collectorProvider(ClientContext(alloc))
 
@@ -82,7 +79,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C
     }
 }
 
-class DummySinkProvider(private val sink: Sink) : SinkProvider {
+class DummySinkFactory(private val sink: Sink) : SinkFactory {
     private val sinkInitialized = AtomicBoolean(false)
 
     override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
index fc4d866..a34b711 100644 (file)
@@ -21,13 +21,12 @@ package org.onap.dcae.collectors.veshv.main.servers
 
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
 import org.onap.dcae.collectors.veshv.factory.AdapterFactory
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Mono
 
@@ -45,8 +44,7 @@ object VesServer {
                     .doOnNext(::logServerStarted)
 
     private fun createVesServer(config: HvVesConfiguration): Server =
-            initializeCollectorFactory(config)
-                    .createVesHvCollectorProvider()
+            createCollectorProvider(config)
                     .let { collectorProvider ->
                         ServerFactory.createNettyTcpServer(
                                 config.server,
@@ -56,8 +54,8 @@ object VesServer {
                         )
                     }
 
-    private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
-            CollectorFactory(
+    private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory =
+            HvVesCollectorFactory(
                     config.collector,
                     AdapterFactory.sinkCreatorFactory(),
                     MicrometerMetrics.INSTANCE,