2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import com.google.protobuf.ByteString
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.jetbrains.spek.api.dsl.on
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
32 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
33 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
34 import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
35 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
36 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
37 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
38 import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
43 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
44 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
45 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
46 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
47 import java.time.Duration
49 object MetricsSpecification : Spek({
52 describe("Bytes received metrics") {
53 it("should sum up all bytes received") {
54 val sut = vesHvWithAlwaysSuccessfulSink()
55 val vesWireFrameMessage = vesWireFrameMessage()
56 val invalidWireFrame = messageWithInvalidWireFrameHeader()
58 val bytesSent = invalidWireFrame.readableBytes() +
59 vesWireFrameMessage.readableBytes()
65 val metrics = sut.metrics
66 assertThat(metrics.bytesReceived)
67 .describedAs("bytesReceived metric")
72 describe("Messages received metrics") {
73 it("should sum up all received messages bytes") {
74 val sut = vesHvWithAlwaysSuccessfulSink()
75 val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
76 val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
77 val firstVesMessage = vesWireFrameMessage(firstVesEvent)
78 val secondVesMessage = vesWireFrameMessage(secondVesEvent)
80 val serializedMessagesSize = firstVesEvent.serializedSize + secondVesEvent.serializedSize
86 val metrics = sut.metrics
87 assertThat(metrics.messageBytesReceived)
88 .describedAs("messageBytesReceived metric")
89 .isEqualTo(serializedMessagesSize)
93 describe("Messages sent metrics") {
94 it("should gather info for each topic separately") {
95 val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
98 vesWireFrameMessage(PERF3GPP),
99 vesWireFrameMessage(PERF3GPP),
100 vesWireFrameMessage(VesEventDomain.MEASUREMENT)
103 val metrics = sut.metrics
104 assertThat(metrics.messagesSentCount)
105 .describedAs("messagesSentCount metric")
107 assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
108 .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
110 assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
111 .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
116 describe("Processing time") {
117 it("should gather processing time metric") {
118 val delay = Duration.ofMillis(10)
119 val sut = vesHvWithDelayingSink(delay)
121 sut.handleConnection(vesWireFrameMessage(PERF3GPP))
124 val metrics = sut.metrics
125 assertThat(metrics.lastProcessingTimeMicros)
126 .describedAs("processingTime metric")
127 .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
131 describe("Messages dropped metrics") {
132 it("should gather metrics for invalid messages") {
133 val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
135 sut.handleConnection(
136 messageWithInvalidWireFrameHeader(),
137 wireFrameMessageWithInvalidPayload(),
138 vesWireFrameMessage(domain = PERF3GPP),
139 messageWithInvalidListenerVersion()
142 val metrics = sut.metrics
143 assertThat(metrics.messagesDropped(INVALID_MESSAGE))
144 .describedAs("messagesDroppedCause $INVALID_MESSAGE metric")
148 it("should gather metrics for route not found") {
149 val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
151 sut.handleConnection(
152 vesWireFrameMessage(domain = PERF3GPP),
153 vesWireFrameMessage(domain = HEARTBEAT)
156 val metrics = sut.metrics
157 assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND))
158 .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric")
162 it("should gather metrics for sing errors") {
163 val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
165 sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
167 val metrics = sut.metrics
168 assertThat(metrics.messagesDropped(KAFKA_FAILURE))
169 .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
173 it("should gather summed metrics for dropped messages") {
174 val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
176 sut.handleConnection(
177 vesWireFrameMessage(domain = PERF3GPP),
178 vesWireFrameMessage(domain = HEARTBEAT),
179 wireFrameMessageWithInvalidPayload()
182 val metrics = sut.metrics
183 assertThat(metrics.messagesDroppedCount)
184 .describedAs("messagesDroppedCount metric")
189 describe("clients rejected metrics") {
190 given("rejection causes") {
192 ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
193 messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1),
194 ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
195 ).forEach { cause, vesMessage ->
197 it("should notify correct metrics") {
198 val sut = vesHvWithAlwaysSuccessfulSink()
200 sut.handleConnection(vesMessage)
202 val metrics = sut.metrics
203 assertThat(metrics.clientRejectionCause.size)
204 .describedAs("metrics were notified with only one rejection cause")
206 assertThat(metrics.clientRejectionCause[cause])
207 .describedAs("metrics were notified only once with correct client rejection cause")