Add stndDefined domain to HV-VES
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / RouterTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2021 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import arrow.core.None
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.verify
25 import com.nhaarman.mockitokotlin2.whenever
26 import org.assertj.core.api.Assertions.assertThat
27 import org.jetbrains.spek.api.Spek
28 import org.jetbrains.spek.api.dsl.describe
29 import org.jetbrains.spek.api.dsl.given
30 import org.jetbrains.spek.api.dsl.it
31 import org.jetbrains.spek.api.dsl.on
32 import org.onap.dcae.collectors.veshv.config.api.model.Route
33 import org.onap.dcae.collectors.veshv.config.api.model.Routing
34 import org.onap.dcae.collectors.veshv.boundary.Sink
35 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
36 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
37 import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace
38 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
39 import org.onap.dcae.collectors.veshv.domain.VesMessage
40 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
41 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
42 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
43 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
44 import reactor.core.publisher.Flux
45 import reactor.test.StepVerifier
46
47
48 /**
49  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
50  * @since May 2018
51  */
52 object RouterTest : Spek({
53
54     describe("Router") {
55
56         whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
57         whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic)
58         whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
59
60         val messageSinkMap = mapOf(
61                 Pair(perf3gppTopic, lazyOf(messageSinkMock)),
62                 Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)),
63                 Pair(syslogTopic, lazyOf(messageSinkMock))
64         )
65
66         given("sample routing specification") {
67             val cut = router(defaultRouting, messageSinkMap)
68
69             on("message with existing route (rtpm)") {
70                 whenever(messageSinkMock.send(routedPerf3GppMessage))
71                         .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
72
73                 it("should be properly routed") {
74                     val result = cut.route(perf3gppMessage)
75
76                     assertThat(result).isNotNull
77                     StepVerifier.create(result)
78                             .expectNext(successfullyConsumedPerf3gppMessage)
79                             .verifyComplete()
80
81                     verify(perf3gppSinkMock).topicName()
82                     verify(messageSinkMock).send(routedPerf3GppMessage)
83                 }
84             }
85
86             on("message with existing route (syslog)") {
87                 whenever(messageSinkMock.send(routedSyslogMessage))
88                         .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
89                 val result = cut.route(syslogMessage)
90
91                 it("should be properly routed") {
92                     StepVerifier.create(result)
93                             .expectNext(successfullyConsumedSyslogMessage)
94                             .verifyComplete()
95
96                     verify(syslogSinkMock).topicName()
97                     verify(messageSinkMock).send(routedSyslogMessage)
98                 }
99             }
100
101             on("message with existing stndDefined route (ves3gppHeartbeat)") {
102                 whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage))
103                         .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage))
104                 val result = cut.route(ves3gppHeartbeatMessage)
105
106                 it("should be properly routed") {
107                     StepVerifier.create(result)
108                             .expectNext(successfullyConsumedVes3gppHeartbeatMessage)
109                             .verifyComplete()
110
111                     verify(ves3gppHeartbeatSinkMock).topicName()
112                     verify(messageSinkMock).send(routedVes3gppHeartbeatMessage)
113                 }
114             }
115
116             on("message with unknown route") {
117                 val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame())
118                 val result = cut.route(message)
119
120                 it("should not have route available") {
121                     StepVerifier.create(result).verifyComplete()
122                 }
123             }
124         }
125     }
126
127 })
128
129 private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
130         Router(routing, kafkaPublisherMap, ClientContext(), mock())
131
132 private const val perf3gppTopic = "PERF_PERF"
133 private val perf3gppSinkMock = mock<KafkaSink>()
134 private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock)
135
136 private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT"
137 private val ves3gppHeartbeatSinkMock = mock<KafkaSink>()
138 private val defaultVes3gppHeartbeatRoute =
139         Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock)
140
141 private const val syslogTopic = "SYS_LOG"
142 private val syslogSinkMock = mock<KafkaSink>()
143 private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock)
144
145 private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute)
146
147
148 private val messageSinkMock = mock<Sink>()
149 private val default_partition = None
150
151 private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame())
152 private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
153 private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
154
155 private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED,
156         stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame())
157 private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition)
158 private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage)
159
160 private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame())
161 private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
162 private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)