2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import com.google.protobuf.ByteString
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.jetbrains.spek.api.dsl.on
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
32 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
33 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
34 import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
35 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
36 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
37 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
38 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
39 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
40 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
41 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
43 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
44 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
45 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
46 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
47 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
48 import java.time.Duration
50 object MetricsSpecification : Spek({
53 describe("Bytes received metrics") {
54 it("should sum up all bytes received") {
55 val sut = vesHvWithAlwaysSuccessfulSink()
56 val vesWireFrameMessage = vesWireFrameMessage()
57 val invalidWireFrame = messageWithInvalidWireFrameHeader()
59 val bytesSent = invalidWireFrame.readableBytes() +
60 vesWireFrameMessage.readableBytes()
66 val metrics = sut.metrics
67 assertThat(metrics.bytesReceived)
68 .describedAs("bytesReceived metric")
73 describe("Messages received metrics") {
74 it("should sum up all received messages bytes") {
75 val sut = vesHvWithAlwaysSuccessfulSink()
76 val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
77 val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
78 val firstVesMessage = vesWireFrameMessage(firstVesEvent)
79 val secondVesMessage = vesWireFrameMessage(secondVesEvent)
81 val serializedMessagesSize = firstVesEvent.serializedSize + secondVesEvent.serializedSize
87 val metrics = sut.metrics
88 assertThat(metrics.messageBytesReceived)
89 .describedAs("messageBytesReceived metric")
90 .isEqualTo(serializedMessagesSize)
94 describe("Messages sent metrics") {
95 it("should gather info for each topic separately") {
96 val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
99 vesWireFrameMessage(PERF3GPP),
100 vesWireFrameMessage(PERF3GPP),
101 vesWireFrameMessage(VesEventDomain.MEASUREMENT)
104 val metrics = sut.metrics
105 assertThat(metrics.messagesSentCount)
106 .describedAs("messagesSentCount metric")
108 assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
109 .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
111 assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC))
112 .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric")
117 describe("Processing time") {
118 it("should gather processing time metric") {
119 val delay = Duration.ofMillis(10)
120 val sut = vesHvWithDelayingSink(delay)
122 sut.handleConnection(vesWireFrameMessage(PERF3GPP))
125 val metrics = sut.metrics
126 assertThat(metrics.lastProcessingTimeMicros)
127 .describedAs("processingTime metric")
128 .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
132 describe("Messages dropped metrics") {
133 it("should gather metrics for invalid messages") {
134 val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
136 sut.handleConnection(
137 messageWithInvalidWireFrameHeader(),
138 wireFrameMessageWithInvalidPayload(),
139 vesWireFrameMessage(domain = PERF3GPP),
140 messageWithInvalidListenerVersion()
143 val metrics = sut.metrics
144 assertThat(metrics.messagesDropped(INVALID_MESSAGE))
145 .describedAs("messagesDroppedCause $INVALID_MESSAGE metric")
149 it("should gather metrics for route not found") {
150 val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
152 sut.handleConnection(
153 vesWireFrameMessage(domain = PERF3GPP),
154 vesWireFrameMessage(domain = HEARTBEAT)
157 val metrics = sut.metrics
158 assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND))
159 .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric")
163 it("should gather metrics for sink errors") {
164 val sut = vesHvWithAlwaysFailingSink(basicRouting)
166 sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
168 val metrics = sut.metrics
169 assertThat(metrics.messagesDropped(KAFKA_FAILURE))
170 .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
174 it("should gather summed metrics for dropped messages") {
175 val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
177 sut.handleConnection(
178 vesWireFrameMessage(domain = PERF3GPP),
179 vesWireFrameMessage(domain = HEARTBEAT),
180 wireFrameMessageWithInvalidPayload()
183 val metrics = sut.metrics
184 assertThat(metrics.messagesDroppedCount)
185 .describedAs("messagesDroppedCount metric")
190 describe("clients rejected metrics") {
191 given("rejection causes") {
193 ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
194 messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1),
195 ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
196 ).forEach { cause, vesMessage ->
198 it("should notify correct metrics") {
199 val sut = vesHvWithAlwaysSuccessfulSink()
201 sut.handleConnection(vesMessage)
203 val metrics = sut.metrics
204 assertThat(metrics.clientRejectionCause.size)
205 .describedAs("metrics were notified with only one rejection cause")
207 assertThat(metrics.clientRejectionCause[cause])
208 .describedAs("metrics were notified only once with correct client rejection cause")