Add stndDefined domain to HV-VES
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / RouterTest.kt
index 90b850c..ad655f6 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2021 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.None
-import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import reactor.core.publisher.Flux
+import reactor.test.StepVerifier
 
 
 /**
@@ -42,72 +50,113 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
  * @since May 2018
  */
 object RouterTest : Spek({
-    given("sample configuration") {
-        val config = routing {
 
-            defineRoute {
-                fromDomain(PERF3GPP.domainName)
-                toTopic("ves_rtpm")
-                withFixedPartitioning(2)
-            }
+    describe("Router") {
 
-            defineRoute {
-                fromDomain(SYSLOG.domainName)
-                toTopic("ves_trace")
-                withFixedPartitioning()
-            }
-        }.build()
-        val cut = Router(config, ClientContext())
+        whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+        whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic)
+        whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
 
-        on("message with existing route (rtpm)") {
-            val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
-            val result = cut.findDestination(message)
+        val messageSinkMap = mapOf(
+                Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+                Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)),
+                Pair(syslogTopic, lazyOf(messageSinkMock))
+        )
 
-            it("should have route available") {
-                assertThat(result).isNotNull()
-            }
+        given("sample routing specification") {
+            val cut = router(defaultRouting, messageSinkMap)
 
-            it("should be routed to proper partition") {
-                assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
-            }
+            on("message with existing route (rtpm)") {
+                whenever(messageSinkMock.send(routedPerf3GppMessage))
+                        .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
 
-            it("should be routed to proper topic") {
-                assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
-            }
+                it("should be properly routed") {
+                    val result = cut.route(perf3gppMessage)
+
+                    assertThat(result).isNotNull
+                    StepVerifier.create(result)
+                            .expectNext(successfullyConsumedPerf3gppMessage)
+                            .verifyComplete()
 
-            it("should be routed with a given message") {
-                assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+                    verify(perf3gppSinkMock).topicName()
+                    verify(messageSinkMock).send(routedPerf3GppMessage)
+                }
             }
-        }
 
-        on("message with existing route (trace)") {
-            val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
-            val result = cut.findDestination(message)
+            on("message with existing route (syslog)") {
+                whenever(messageSinkMock.send(routedSyslogMessage))
+                        .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
+                val result = cut.route(syslogMessage)
 
-            it("should have route available") {
-                assertThat(result).isNotNull()
-            }
+                it("should be properly routed") {
+                    StepVerifier.create(result)
+                            .expectNext(successfullyConsumedSyslogMessage)
+                            .verifyComplete()
 
-            it("should be routed to proper partition") {
-                assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
+                    verify(syslogSinkMock).topicName()
+                    verify(messageSinkMock).send(routedSyslogMessage)
+                }
             }
 
-            it("should be routed to proper topic") {
-                assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
-            }
+            on("message with existing stndDefined route (ves3gppHeartbeat)") {
+                whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage))
+                        .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage))
+                val result = cut.route(ves3gppHeartbeatMessage)
 
-            it("should be routed with a given message") {
-                assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+                it("should be properly routed") {
+                    StepVerifier.create(result)
+                            .expectNext(successfullyConsumedVes3gppHeartbeatMessage)
+                            .verifyComplete()
+
+                    verify(ves3gppHeartbeatSinkMock).topicName()
+                    verify(messageSinkMock).send(routedVes3gppHeartbeatMessage)
+                }
             }
-        }
 
-        on("message with unknown route") {
-            val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
-            val result = cut.findDestination(message)
+            on("message with unknown route") {
+                val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame())
+                val result = cut.route(message)
 
-            it("should not have route available") {
-                assertThat(result).isEqualTo(None)
+                it("should not have route available") {
+                    StepVerifier.create(result).verifyComplete()
+                }
             }
         }
     }
-})
\ No newline at end of file
+
+})
+
+private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
+        Router(routing, kafkaPublisherMap, ClientContext(), mock())
+
+private const val perf3gppTopic = "PERF_PERF"
+private val perf3gppSinkMock = mock<KafkaSink>()
+private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock)
+
+private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT"
+private val ves3gppHeartbeatSinkMock = mock<KafkaSink>()
+private val defaultVes3gppHeartbeatRoute =
+        Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock)
+
+private const val syslogTopic = "SYS_LOG"
+private val syslogSinkMock = mock<KafkaSink>()
+private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute)
+
+
+private val messageSinkMock = mock<Sink>()
+private val default_partition = None
+
+private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame())
+private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
+private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
+
+private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED,
+        stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame())
+private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition)
+private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage)
+
+private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame())
+private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
+private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file