Various improvements
[dcaegen2/collectors/hv-ves.git] / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / RouterTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import arrow.core.None
23 import arrow.core.Some
24 import org.assertj.core.api.Assertions.assertThat
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.jetbrains.spek.api.dsl.on
29 import org.onap.dcae.collectors.veshv.domain.ByteData
30 import org.onap.dcae.collectors.veshv.model.RoutedMessage
31 import org.onap.dcae.collectors.veshv.model.VesMessage
32 import org.onap.dcae.collectors.veshv.model.routing
33 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
34 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
35
36 /**
37  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
38  * @since May 2018
39  */
40 object RouterTest : Spek({
41     given("sample configuration") {
42         val config = routing {
43
44             defineRoute {
45                 fromDomain(Domain.HVRANMEAS)
46                 toTopic("ves_rtpm")
47                 withFixedPartitioning(2)
48             }
49
50             defineRoute {
51                 fromDomain(Domain.SYSLOG)
52                 toTopic("ves_trace")
53                 withFixedPartitioning()
54             }
55         }.build()
56         val cut = Router(config)
57
58         on("message with existing route (rtpm)") {
59             val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), ByteData.EMPTY)
60             val result = cut.findDestination(message)
61
62             it("should have route available") {
63                 assertThat(result).isNotNull()
64             }
65
66             it("should be routed to proper partition") {
67                 assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
68             }
69
70             it("should be routed to proper topic") {
71                 assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
72             }
73
74             it("should be routed with a given message") {
75                 assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
76             }
77         }
78
79         on("message with existing route (trace)") {
80             val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), ByteData.EMPTY)
81             val result = cut.findDestination(message)
82
83             it("should have route available") {
84                 assertThat(result).isNotNull()
85             }
86
87             it("should be routed to proper partition") {
88                 assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
89             }
90
91             it("should be routed to proper topic") {
92                 assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
93             }
94
95             it("should be routed with a given message") {
96                 assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
97             }
98         }
99
100         on("message with unknown route") {
101             val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), ByteData.EMPTY)
102             val result = cut.findDestination(message)
103
104             it("should not have route available") {
105                 assertThat(result).isEqualTo(None)
106             }
107         }
108     }
109 })
110
111 private fun vesCommonHeaderWithDomain(domain: Domain) =
112         CommonEventHeader.getDefaultInstance().toBuilder()
113                 .setDomain(domain)
114                 .build()