Move routing functionality inside Router 49/83349/6
authorFilip Krzywka <filip.krzywka@nokia.com>
Tue, 26 Mar 2019 12:57:11 +0000 (13:57 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Wed, 27 Mar 2019 11:44:17 +0000 (12:44 +0100)
- also removed Routing-DSL as it won't be needed anymore

Change-Id: Ifc7bc7641a60936b5257c0bff7a8c51dddc30687
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt

index aab8eca..45180a8 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.config.api.model
 
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
-data class Routing(val routes: List<Route>) {
+data class Routing(val routes: List<Route>)
 
-    fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
-            Option.fromNullable(routes.find { it.applies(commonHeader) })
-}
-
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) {
-
-    fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
-
-    operator fun invoke(message: VesMessage): RoutedMessage =
-            RoutedMessage(targetTopic, partitioning(message.header), message)
-}
-
-
-/*
-HvVesConfiguration DSL
-*/
-
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init)
-
-class RoutingBuilder {
-    private val routes: MutableList<RouteBuilder> = mutableListOf()
-
-    fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder()
-            .apply(init)
-            .also { routes.add(it) }
-
-    fun build() = Routing(routes.map { it.build() }.toList())
-}
-
-class RouteBuilder {
-
-    private lateinit var domain: String
-    private lateinit var targetTopic: String
-    private lateinit var partitioning: (CommonEventHeader) -> Int
-
-    fun fromDomain(domain: String): RouteBuilder = apply {
-        this.domain = domain
-    }
-
-    fun toTopic(targetTopic: String): RouteBuilder = apply {
-        this.targetTopic = targetTopic
-    }
-
-    fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply {
-        partitioning = { num }
-    }
-
-    fun build() = Route(domain, targetTopic, partitioning)
-}
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 })
\ No newline at end of file
index d2c35cb..a95a44d 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Option
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
 class Router(private val routing: Routing, private val ctx: ClientContext) {
 
     constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
-            routing {
-                kafkaSinks.forEach {
-                    defineRoute {
-                        fromDomain(it.name())
-                        toTopic(it.topicName())
-                        withFixedPartitioning()
-                    }
-                }
-            }.build(),
-            ctx
-    )
+            Routing(
+                    kafkaSinks.map { Route(it.name(), it.topicName()) }.toList()
+            ),
+            ctx)
 
     fun findDestination(message: VesMessage): Option<RoutedMessage> =
-            routing.routeFor(message.header).map { it(message) }.also {
-                if (it.isEmpty()) {
-                    logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
-                }
-            }
+            routeFor(message.header)
+                    .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } }
+                    .map { it.routeMessage(message) }
+
+    private fun Route.routeMessage(message: VesMessage) =
+            RoutedMessage(targetTopic, partitioning(message.header), message)
+
+    private fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+            routing.routes.find { it.domain == commonHeader.domain }.toOption()
 
     companion object {
         private val logger = Logger(Routing::class)
index f9fd698..1f5df37 100644 (file)
@@ -27,14 +27,14 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -94,13 +94,14 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
     private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
             try {
                 DataStreams.namedSinks(configuration)
-                        .filter(streamOfType(KAFKA))
+                        .filter(StreamPredicates.streamOfType(StreamType.KAFKA))
                         .map(streamParser::unsafeParse)
                         .asSequence()
             } catch (e: NullPointerException) {
                 throw ParsingException("Failed to parse configuration", e)
             }
 
+
     companion object {
         private const val MAX_RETRIES = 5L
         private val logger = Logger(ConfigurationProviderImpl::class)
index 9629816..b8b5586 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,7 +26,8 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
@@ -43,20 +44,10 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
  */
 object RouterTest : Spek({
     given("sample configuration") {
-        val config = routing {
-
-            defineRoute {
-                fromDomain(PERF3GPP.domainName)
-                toTopic("ves_rtpm")
-                withFixedPartitioning(2)
-            }
-
-            defineRoute {
-                fromDomain(SYSLOG.domainName)
-                toTopic("ves_trace")
-                withFixedPartitioning()
-            }
-        }.build()
+        val config = Routing(listOf(
+                Route(PERF3GPP.domainName, "ves_rtpm", { 2 }),
+                Route(SYSLOG.domainName, "ves_trace")
+        ))
         val cut = Router(config, ClientContext())
 
         on("message with existing route (rtpm)") {
index 8ea53fb..eb0a317 100644 (file)
@@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.ves.VesEventOuterClass
@@ -45,7 +45,8 @@ internal object KafkaSinkProviderTest : Spek({
             val config = CollectorConfiguration(
                     maxRequestSizeBytes = 1024 * 1024,
                     kafkaServers = "localhost:9090",
-                    routing = routing { }.build())
+                    routing = Routing(emptyList())
+            )
 
             val cut = KafkaSinkProvider(config)
 
index d965b78..109915a 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
index 6425601..a398967 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.