Merge configurations
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / MetricsSpecification.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import com.google.protobuf.ByteString
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.jetbrains.spek.api.dsl.on
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
32 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
33 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
34 import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
35 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
36 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
37 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
38 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
39 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
40 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
41 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
43 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
44 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
45 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
46 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
47 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
48 import java.time.Duration
49
50 object MetricsSpecification : Spek({
51     debugRx(false)
52
53     describe("Bytes received metrics") {
54         it("should sum up all bytes received") {
55             val sut = vesHvWithAlwaysSuccessfulSink()
56             val vesWireFrameMessage = vesWireFrameMessage()
57             val invalidWireFrame = messageWithInvalidWireFrameHeader()
58
59             val bytesSent = invalidWireFrame.readableBytes() +
60                     vesWireFrameMessage.readableBytes()
61             sut.handleConnection(
62                     vesWireFrameMessage,
63                     invalidWireFrame
64             )
65
66             val metrics = sut.metrics
67             assertThat(metrics.bytesReceived)
68                     .describedAs("bytesReceived metric")
69                     .isEqualTo(bytesSent)
70         }
71     }
72
73     describe("Messages received metrics") {
74         it("should sum up all received messages bytes") {
75             val sut = vesHvWithAlwaysSuccessfulSink()
76             val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
77             val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
78             val firstVesMessage = vesWireFrameMessage(firstVesEvent)
79             val secondVesMessage = vesWireFrameMessage(secondVesEvent)
80
81             val serializedMessagesSize = firstVesEvent.serializedSize + secondVesEvent.serializedSize
82             sut.handleConnection(
83                     firstVesMessage,
84                     secondVesMessage
85             )
86
87             val metrics = sut.metrics
88             assertThat(metrics.messageBytesReceived)
89                     .describedAs("messageBytesReceived metric")
90                     .isEqualTo(serializedMessagesSize)
91         }
92     }
93
94     describe("Messages sent metrics") {
95         it("should gather info for each topic separately") {
96             val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
97
98             sut.handleConnection(
99                     vesWireFrameMessage(PERF3GPP),
100                     vesWireFrameMessage(PERF3GPP),
101                     vesWireFrameMessage(VesEventDomain.MEASUREMENT)
102             )
103
104             val metrics = sut.metrics
105             assertThat(metrics.messagesSentCount)
106                     .describedAs("messagesSentCount metric")
107                     .isEqualTo(3)
108             assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
109                     .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
110                     .isEqualTo(2)
111             assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC))
112                     .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric")
113                     .isEqualTo(1)
114         }
115     }
116
117     describe("Processing time") {
118         it("should gather processing time metric") {
119             val delay = Duration.ofMillis(10)
120             val sut = vesHvWithDelayingSink(delay)
121
122             sut.handleConnection(vesWireFrameMessage(PERF3GPP))
123
124
125             val metrics = sut.metrics
126             assertThat(metrics.lastProcessingTimeMicros)
127                     .describedAs("processingTime metric")
128                     .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
129         }
130     }
131
132     describe("Messages dropped metrics") {
133         it("should gather metrics for invalid messages") {
134             val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
135
136             sut.handleConnection(
137                     messageWithInvalidWireFrameHeader(),
138                     wireFrameMessageWithInvalidPayload(),
139                     vesWireFrameMessage(domain = PERF3GPP),
140                     messageWithInvalidListenerVersion()
141             )
142
143             val metrics = sut.metrics
144             assertThat(metrics.messagesDropped(INVALID_MESSAGE))
145                     .describedAs("messagesDroppedCause $INVALID_MESSAGE metric")
146                     .isEqualTo(3)
147         }
148
149         it("should gather metrics for route not found") {
150             val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
151
152             sut.handleConnection(
153                     vesWireFrameMessage(domain = PERF3GPP),
154                     vesWireFrameMessage(domain = HEARTBEAT)
155             )
156
157             val metrics = sut.metrics
158             assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND))
159                     .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric")
160                     .isEqualTo(1)
161         }
162
163         it("should gather metrics for sink errors") {
164             val sut = vesHvWithAlwaysFailingSink(basicRouting)
165
166             sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
167
168             val metrics = sut.metrics
169             assertThat(metrics.messagesDropped(KAFKA_FAILURE))
170                     .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
171                     .isEqualTo(1)
172         }
173
174         it("should gather summed metrics for dropped messages") {
175             val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
176
177             sut.handleConnection(
178                     vesWireFrameMessage(domain = PERF3GPP),
179                     vesWireFrameMessage(domain = HEARTBEAT),
180                     wireFrameMessageWithInvalidPayload()
181             )
182
183             val metrics = sut.metrics
184             assertThat(metrics.messagesDroppedCount)
185                     .describedAs("messagesDroppedCount metric")
186                     .isEqualTo(2)
187         }
188     }
189
190     describe("clients rejected metrics") {
191         given("rejection causes") {
192             mapOf(
193                     ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
194                             messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1),
195                     ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
196             ).forEach { cause, vesMessage ->
197                 on("cause $cause") {
198                     it("should notify correct metrics") {
199                         val sut = vesHvWithAlwaysSuccessfulSink()
200
201                         sut.handleConnection(vesMessage)
202
203                         val metrics = sut.metrics
204                         assertThat(metrics.clientRejectionCause.size)
205                                 .describedAs("metrics were notified with only one rejection cause")
206                                 .isOne()
207                         assertThat(metrics.clientRejectionCause[cause])
208                                 .describedAs("metrics were notified only once with correct client rejection cause")
209                                 .isOne()
210                     }
211                 }
212             }
213         }
214     }
215 })