Add stndDefined domain to HV-VES
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / RouterTest.kt
index 533581d..ad655f6 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2021 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -33,9 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace
 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
@@ -55,10 +54,12 @@ object RouterTest : Spek({
     describe("Router") {
 
         whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+        whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic)
         whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
 
         val messageSinkMap = mapOf(
                 Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+                Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)),
                 Pair(syslogTopic, lazyOf(messageSinkMock))
         )
 
@@ -72,7 +73,7 @@ object RouterTest : Spek({
                 it("should be properly routed") {
                     val result = cut.route(perf3gppMessage)
 
-                    assertThat(result).isNotNull()
+                    assertThat(result).isNotNull
                     StepVerifier.create(result)
                             .expectNext(successfullyConsumedPerf3gppMessage)
                             .verifyComplete()
@@ -97,8 +98,23 @@ object RouterTest : Spek({
                 }
             }
 
+            on("message with existing stndDefined route (ves3gppHeartbeat)") {
+                whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage))
+                        .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage))
+                val result = cut.route(ves3gppHeartbeatMessage)
+
+                it("should be properly routed") {
+                    StepVerifier.create(result)
+                            .expectNext(successfullyConsumedVes3gppHeartbeatMessage)
+                            .verifyComplete()
+
+                    verify(ves3gppHeartbeatSinkMock).topicName()
+                    verify(messageSinkMock).send(routedVes3gppHeartbeatMessage)
+                }
+            }
+
             on("message with unknown route") {
-                val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
+                val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame())
                 val result = cut.route(message)
 
                 it("should not have route available") {
@@ -113,23 +129,34 @@ object RouterTest : Spek({
 private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
         Router(routing, kafkaPublisherMap, ClientContext(), mock())
 
-private val perf3gppTopic = "PERF_PERF"
+private const val perf3gppTopic = "PERF_PERF"
 private val perf3gppSinkMock = mock<KafkaSink>()
-private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
+private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock)
+
+private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT"
+private val ves3gppHeartbeatSinkMock = mock<KafkaSink>()
+private val defaultVes3gppHeartbeatRoute =
+        Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock)
 
-private val syslogTopic = "SYS_LOG"
+private const val syslogTopic = "SYS_LOG"
 private val syslogSinkMock = mock<KafkaSink>()
-private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
+private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute)
 
-private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
 
 private val messageSinkMock = mock<Sink>()
 private val default_partition = None
 
-private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
+private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame())
 private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
 private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
 
-private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
+private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED,
+        stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame())
+private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition)
+private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage)
+
+private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame())
 private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
 private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file