Add metrics for active connections count
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / MetricsSpecification.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import com.google.protobuf.ByteString
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.jetbrains.spek.api.dsl.on
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
32 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
33 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
34 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
35 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
36 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
37 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
38 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
39 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
40 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
43 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
44 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
45 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
46 import java.time.Duration
47
48 object MetricsSpecification : Spek({
49     debugRx(false)
50
51     describe("Bytes received metrics") {
52         it("should sum up all bytes received") {
53             val sut = vesHvWithNoOpSink()
54             val vesWireFrameMessage = vesWireFrameMessage()
55             val invalidWireFrame = messageWithInvalidWireFrameHeader()
56
57             val bytesSent = invalidWireFrame.readableBytes() +
58                     vesWireFrameMessage.readableBytes()
59             sut.handleConnection(
60                     vesWireFrameMessage,
61                     invalidWireFrame
62             )
63
64             val metrics = sut.metrics
65             assertThat(metrics.bytesReceived)
66                     .describedAs("bytesReceived metric")
67                     .isEqualTo(bytesSent)
68         }
69     }
70
71     describe("Messages received metrics") {
72         it("should sum up all received messages bytes") {
73             val sut = vesHvWithNoOpSink()
74             val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
75             val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
76             val firstVesMessage = vesWireFrameMessage(firstVesEvent)
77             val secondVesMessage = vesWireFrameMessage(secondVesEvent)
78
79             val serializedMessagesSize = firstVesEvent.serializedSize + secondVesEvent.serializedSize
80             sut.handleConnection(
81                     firstVesMessage,
82                     secondVesMessage
83             )
84
85             val metrics = sut.metrics
86             assertThat(metrics.messageBytesReceived)
87                     .describedAs("messageBytesReceived metric")
88                     .isEqualTo(serializedMessagesSize)
89         }
90     }
91
92     describe("Messages sent metrics") {
93         it("should gather info for each topic separately") {
94             val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration)
95
96             sut.handleConnection(
97                     vesWireFrameMessage(PERF3GPP),
98                     vesWireFrameMessage(PERF3GPP),
99                     vesWireFrameMessage(VesEventDomain.MEASUREMENT)
100             )
101
102             val metrics = sut.metrics
103             assertThat(metrics.messagesSentCount)
104                     .describedAs("messagesSentCount metric")
105                     .isEqualTo(3)
106             assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
107                     .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
108                     .isEqualTo(2)
109             assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
110                     .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
111                     .isEqualTo(1)
112         }
113     }
114
115     describe("Processing time") {
116         it("should gather processing time metric") {
117             val delay = Duration.ofMillis(10)
118             val sut = vesHvWithDelayingSink(delay)
119
120             sut.handleConnection(vesWireFrameMessage(PERF3GPP))
121
122
123             val metrics = sut.metrics
124             assertThat(metrics.lastProcessingTimeMicros)
125                     .describedAs("processingTime metric")
126                     .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
127         }
128     }
129
130     describe("Messages dropped metrics") {
131         it("should gather metrics for invalid messages") {
132             val sut = vesHvWithNoOpSink(basicConfiguration)
133
134             sut.handleConnection(
135                     messageWithInvalidWireFrameHeader(),
136                     wireFrameMessageWithInvalidPayload(),
137                     vesWireFrameMessage(domain = PERF3GPP),
138                     messageWithInvalidListenerVersion()
139             )
140
141             val metrics = sut.metrics
142             assertThat(metrics.messagesDropped(INVALID_MESSAGE))
143                     .describedAs("messagesDroppedCause $INVALID_MESSAGE metric")
144                     .isEqualTo(3)
145         }
146
147         it("should gather metrics for route not found") {
148             val sut = vesHvWithNoOpSink(basicConfiguration)
149
150             sut.handleConnection(
151                     vesWireFrameMessage(domain = PERF3GPP),
152                     vesWireFrameMessage(domain = HEARTBEAT)
153             )
154
155             val metrics = sut.metrics
156             assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND))
157                     .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric")
158                     .isEqualTo(1)
159         }
160
161         it("should gather summed metrics for dropped messages") {
162             val sut = vesHvWithNoOpSink(basicConfiguration)
163
164             sut.handleConnection(
165                     vesWireFrameMessage(domain = PERF3GPP),
166                     vesWireFrameMessage(domain = HEARTBEAT),
167                     wireFrameMessageWithInvalidPayload()
168             )
169
170             val metrics = sut.metrics
171             assertThat(metrics.messagesDroppedCount)
172                     .describedAs("messagesDroppedCount metric")
173                     .isEqualTo(2)
174         }
175     }
176
177     describe("clients rejected metrics") {
178         given("rejection causes") {
179             mapOf(
180                     ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
181                             messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1),
182                     ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
183             ).forEach { cause, vesMessage ->
184                 on("cause $cause") {
185                     it("should notify correct metrics") {
186                         val sut = vesHvWithNoOpSink()
187
188                         sut.handleConnection(vesMessage)
189
190                         val metrics = sut.metrics
191                         assertThat(metrics.clientRejectionCause.size)
192                                 .describedAs("metrics were notified with only one rejection cause")
193                                 .isOne()
194                         assertThat(metrics.clientRejectionCause[cause])
195                                 .describedAs("metrics were notified only once with correct client rejection cause")
196                                 .isOne()
197                     }
198                 }
199             }
200         }
201     }
202 })