2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2021 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl
22 import arrow.core.None
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.verify
25 import com.nhaarman.mockitokotlin2.whenever
26 import org.assertj.core.api.Assertions.assertThat
27 import org.jetbrains.spek.api.Spek
28 import org.jetbrains.spek.api.dsl.describe
29 import org.jetbrains.spek.api.dsl.given
30 import org.jetbrains.spek.api.dsl.it
31 import org.jetbrains.spek.api.dsl.on
32 import org.onap.dcae.collectors.veshv.config.api.model.Route
33 import org.onap.dcae.collectors.veshv.config.api.model.Routing
34 import org.onap.dcae.collectors.veshv.boundary.Sink
35 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
36 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
37 import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace
38 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
39 import org.onap.dcae.collectors.veshv.domain.VesMessage
40 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
41 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
42 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
43 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
44 import reactor.core.publisher.Flux
45 import reactor.test.StepVerifier
49 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
52 object RouterTest : Spek({
56 whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
57 whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic)
58 whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
60 val messageSinkMap = mapOf(
61 Pair(perf3gppTopic, lazyOf(messageSinkMock)),
62 Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)),
63 Pair(syslogTopic, lazyOf(messageSinkMock))
66 given("sample routing specification") {
67 val cut = router(defaultRouting, messageSinkMap)
69 on("message with existing route (rtpm)") {
70 whenever(messageSinkMock.send(routedPerf3GppMessage))
71 .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
73 it("should be properly routed") {
74 val result = cut.route(perf3gppMessage)
76 assertThat(result).isNotNull
77 StepVerifier.create(result)
78 .expectNext(successfullyConsumedPerf3gppMessage)
81 verify(perf3gppSinkMock).topicName()
82 verify(messageSinkMock).send(routedPerf3GppMessage)
86 on("message with existing route (syslog)") {
87 whenever(messageSinkMock.send(routedSyslogMessage))
88 .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
89 val result = cut.route(syslogMessage)
91 it("should be properly routed") {
92 StepVerifier.create(result)
93 .expectNext(successfullyConsumedSyslogMessage)
96 verify(syslogSinkMock).topicName()
97 verify(messageSinkMock).send(routedSyslogMessage)
101 on("message with existing stndDefined route (ves3gppHeartbeat)") {
102 whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage))
103 .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage))
104 val result = cut.route(ves3gppHeartbeatMessage)
106 it("should be properly routed") {
107 StepVerifier.create(result)
108 .expectNext(successfullyConsumedVes3gppHeartbeatMessage)
111 verify(ves3gppHeartbeatSinkMock).topicName()
112 verify(messageSinkMock).send(routedVes3gppHeartbeatMessage)
116 on("message with unknown route") {
117 val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame())
118 val result = cut.route(message)
120 it("should not have route available") {
121 StepVerifier.create(result).verifyComplete()
129 private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
130 Router(routing, kafkaPublisherMap, ClientContext(), mock())
132 private const val perf3gppTopic = "PERF_PERF"
133 private val perf3gppSinkMock = mock<KafkaSink>()
134 private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock)
136 private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT"
137 private val ves3gppHeartbeatSinkMock = mock<KafkaSink>()
138 private val defaultVes3gppHeartbeatRoute =
139 Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock)
141 private const val syslogTopic = "SYS_LOG"
142 private val syslogSinkMock = mock<KafkaSink>()
143 private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock)
145 private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute)
148 private val messageSinkMock = mock<Sink>()
149 private val default_partition = None
151 private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame())
152 private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
153 private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
155 private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED,
156 stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame())
157 private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition)
158 private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage)
160 private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame())
161 private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
162 private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)