2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl
22 import arrow.core.None
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.verify
25 import com.nhaarman.mockitokotlin2.whenever
26 import org.assertj.core.api.Assertions.assertThat
27 import org.jetbrains.spek.api.Spek
28 import org.jetbrains.spek.api.dsl.describe
29 import org.jetbrains.spek.api.dsl.given
30 import org.jetbrains.spek.api.dsl.it
31 import org.jetbrains.spek.api.dsl.on
32 import org.onap.dcae.collectors.veshv.config.api.model.Route
33 import org.onap.dcae.collectors.veshv.config.api.model.Routing
34 import org.onap.dcae.collectors.veshv.boundary.Sink
35 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
36 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
37 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
38 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
39 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
40 import org.onap.dcae.collectors.veshv.domain.VesMessage
41 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
42 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
43 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
44 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
45 import reactor.core.publisher.Flux
46 import reactor.test.StepVerifier
50 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
53 object RouterTest : Spek({
57 whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
58 whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
60 val messageSinkMap = mapOf(
61 Pair(perf3gppTopic, lazyOf(messageSinkMock)),
62 Pair(syslogTopic, lazyOf(messageSinkMock))
65 given("sample routing specification") {
66 val cut = router(defaultRouting, messageSinkMap)
68 on("message with existing route (rtpm)") {
69 whenever(messageSinkMock.send(routedPerf3GppMessage))
70 .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
72 it("should be properly routed") {
73 val result = cut.route(perf3gppMessage)
75 assertThat(result).isNotNull()
76 StepVerifier.create(result)
77 .expectNext(successfullyConsumedPerf3gppMessage)
80 verify(perf3gppSinkMock).topicName()
81 verify(messageSinkMock).send(routedPerf3GppMessage)
85 on("message with existing route (syslog)") {
86 whenever(messageSinkMock.send(routedSyslogMessage))
87 .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
88 val result = cut.route(syslogMessage)
90 it("should be properly routed") {
91 StepVerifier.create(result)
92 .expectNext(successfullyConsumedSyslogMessage)
95 verify(syslogSinkMock).topicName()
96 verify(messageSinkMock).send(routedSyslogMessage)
100 on("message with unknown route") {
101 val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
102 val result = cut.route(message)
104 it("should not have route available") {
105 StepVerifier.create(result).verifyComplete()
113 private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
114 Router(routing, kafkaPublisherMap, ClientContext(), mock())
116 private val perf3gppTopic = "PERF_PERF"
117 private val perf3gppSinkMock = mock<KafkaSink>()
118 private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
120 private val syslogTopic = "SYS_LOG"
121 private val syslogSinkMock = mock<KafkaSink>()
122 private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
124 private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
126 private val messageSinkMock = mock<Sink>()
127 private val default_partition = None
129 private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
130 private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
131 private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
133 private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
134 private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
135 private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)