6b9c68037dfc6119209ccf7d9a540105d4e79ba0
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / RouterTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import arrow.core.None
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.verify
25 import com.nhaarman.mockitokotlin2.whenever
26 import org.assertj.core.api.Assertions.assertThat
27 import org.jetbrains.spek.api.Spek
28 import org.jetbrains.spek.api.dsl.describe
29 import org.jetbrains.spek.api.dsl.given
30 import org.jetbrains.spek.api.dsl.it
31 import org.jetbrains.spek.api.dsl.on
32 import org.onap.dcae.collectors.veshv.config.api.model.Route
33 import org.onap.dcae.collectors.veshv.config.api.model.Routing
34 import org.onap.dcae.collectors.veshv.boundary.Sink
35 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
36 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
37 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
38 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
39 import org.onap.dcae.collectors.veshv.model.ClientContext
40 import org.onap.dcae.collectors.veshv.domain.VesMessage
41 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
42 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
43 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
44 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
45 import reactor.core.publisher.Flux
46 import reactor.test.StepVerifier
47
48
49 /**
50  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
51  * @since May 2018
52  */
53 object RouterTest : Spek({
54
55     describe("Router") {
56
57         whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
58         whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
59
60         val messageSinkMap = mapOf(
61                 Pair(perf3gppTopic, lazyOf(messageSinkMock)),
62                 Pair(syslogTopic, lazyOf(messageSinkMock))
63         )
64
65         given("sample routing specification") {
66             val cut = router(defaultRouting, messageSinkMap)
67
68             on("message with existing route (rtpm)") {
69                 whenever(messageSinkMock.send(routedPerf3GppMessage))
70                         .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
71
72                 it("should be properly routed") {
73                     val result = cut.route(perf3gppMessage)
74
75                     assertThat(result).isNotNull()
76                     StepVerifier.create(result)
77                             .expectNext(successfullyConsumedPerf3gppMessage)
78                             .verifyComplete()
79
80                     verify(perf3gppSinkMock).topicName()
81                     verify(messageSinkMock).send(routedPerf3GppMessage)
82                 }
83             }
84
85             on("message with existing route (syslog)") {
86                 whenever(messageSinkMock.send(routedSyslogMessage))
87                         .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
88                 val result = cut.route(syslogMessage)
89
90                 it("should be properly routed") {
91                     StepVerifier.create(result)
92                             .expectNext(successfullyConsumedSyslogMessage)
93                             .verifyComplete()
94
95                     verify(syslogSinkMock).topicName()
96                     verify(messageSinkMock).send(routedSyslogMessage)
97                 }
98             }
99
100             on("message with unknown route") {
101                 val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
102                 val result = cut.route(message)
103
104                 it("should not have route available") {
105                     StepVerifier.create(result).verifyComplete()
106                 }
107             }
108         }
109     }
110
111 })
112
113 private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
114         Router(routing, kafkaPublisherMap, ClientContext(), mock())
115
116 private val perf3gppTopic = "PERF_PERF"
117 private val perf3gppSinkMock = mock<KafkaSink>()
118 private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
119
120 private val syslogTopic = "SYS_LOG"
121 private val syslogSinkMock = mock<KafkaSink>()
122 private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
123
124 private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
125
126 private val messageSinkMock = mock<Sink>()
127 private val default_partition = None
128
129 private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
130 private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
131 private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
132
133 private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
134 private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
135 private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)