Use DataStream API from CBS client
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / MetricsSpecification.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import com.google.protobuf.ByteString
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.jetbrains.spek.api.dsl.on
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
32 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
33 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
34 import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
35 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
36 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
37 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
38 import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
43 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
44 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
45 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
46 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
47 import java.time.Duration
48
49 object MetricsSpecification : Spek({
50     debugRx(false)
51
52     describe("Bytes received metrics") {
53         it("should sum up all bytes received") {
54             val sut = vesHvWithAlwaysSuccessfulSink()
55             val vesWireFrameMessage = vesWireFrameMessage()
56             val invalidWireFrame = messageWithInvalidWireFrameHeader()
57
58             val bytesSent = invalidWireFrame.readableBytes() +
59                     vesWireFrameMessage.readableBytes()
60             sut.handleConnection(
61                     vesWireFrameMessage,
62                     invalidWireFrame
63             )
64
65             val metrics = sut.metrics
66             assertThat(metrics.bytesReceived)
67                     .describedAs("bytesReceived metric")
68                     .isEqualTo(bytesSent)
69         }
70     }
71
72     describe("Messages received metrics") {
73         it("should sum up all received messages bytes") {
74             val sut = vesHvWithAlwaysSuccessfulSink()
75             val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
76             val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
77             val firstVesMessage = vesWireFrameMessage(firstVesEvent)
78             val secondVesMessage = vesWireFrameMessage(secondVesEvent)
79
80             val serializedMessagesSize = firstVesEvent.serializedSize + secondVesEvent.serializedSize
81             sut.handleConnection(
82                     firstVesMessage,
83                     secondVesMessage
84             )
85
86             val metrics = sut.metrics
87             assertThat(metrics.messageBytesReceived)
88                     .describedAs("messageBytesReceived metric")
89                     .isEqualTo(serializedMessagesSize)
90         }
91     }
92
93     describe("Messages sent metrics") {
94         it("should gather info for each topic separately") {
95             val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
96
97             sut.handleConnection(
98                     vesWireFrameMessage(PERF3GPP),
99                     vesWireFrameMessage(PERF3GPP),
100                     vesWireFrameMessage(VesEventDomain.MEASUREMENT)
101             )
102
103             val metrics = sut.metrics
104             assertThat(metrics.messagesSentCount)
105                     .describedAs("messagesSentCount metric")
106                     .isEqualTo(3)
107             assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
108                     .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
109                     .isEqualTo(2)
110             assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
111                     .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
112                     .isEqualTo(1)
113         }
114     }
115
116     describe("Processing time") {
117         it("should gather processing time metric") {
118             val delay = Duration.ofMillis(10)
119             val sut = vesHvWithDelayingSink(delay)
120
121             sut.handleConnection(vesWireFrameMessage(PERF3GPP))
122
123
124             val metrics = sut.metrics
125             assertThat(metrics.lastProcessingTimeMicros)
126                     .describedAs("processingTime metric")
127                     .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
128         }
129     }
130
131     describe("Messages dropped metrics") {
132         it("should gather metrics for invalid messages") {
133             val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
134
135             sut.handleConnection(
136                     messageWithInvalidWireFrameHeader(),
137                     wireFrameMessageWithInvalidPayload(),
138                     vesWireFrameMessage(domain = PERF3GPP),
139                     messageWithInvalidListenerVersion()
140             )
141
142             val metrics = sut.metrics
143             assertThat(metrics.messagesDropped(INVALID_MESSAGE))
144                     .describedAs("messagesDroppedCause $INVALID_MESSAGE metric")
145                     .isEqualTo(3)
146         }
147
148         it("should gather metrics for route not found") {
149             val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
150
151             sut.handleConnection(
152                     vesWireFrameMessage(domain = PERF3GPP),
153                     vesWireFrameMessage(domain = HEARTBEAT)
154             )
155
156             val metrics = sut.metrics
157             assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND))
158                     .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric")
159                     .isEqualTo(1)
160         }
161
162         it("should gather metrics for sing errors") {
163             val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
164
165             sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
166
167             val metrics = sut.metrics
168             assertThat(metrics.messagesDropped(KAFKA_FAILURE))
169                     .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
170                     .isEqualTo(1)
171         }
172
173         it("should gather summed metrics for dropped messages") {
174             val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
175
176             sut.handleConnection(
177                     vesWireFrameMessage(domain = PERF3GPP),
178                     vesWireFrameMessage(domain = HEARTBEAT),
179                     wireFrameMessageWithInvalidPayload()
180             )
181
182             val metrics = sut.metrics
183             assertThat(metrics.messagesDroppedCount)
184                     .describedAs("messagesDroppedCount metric")
185                     .isEqualTo(2)
186         }
187     }
188
189     describe("clients rejected metrics") {
190         given("rejection causes") {
191             mapOf(
192                     ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
193                             messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1),
194                     ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
195             ).forEach { cause, vesMessage ->
196                 on("cause $cause") {
197                     it("should notify correct metrics") {
198                         val sut = vesHvWithAlwaysSuccessfulSink()
199
200                         sut.handleConnection(vesMessage)
201
202                         val metrics = sut.metrics
203                         assertThat(metrics.clientRejectionCause.size)
204                                 .describedAs("metrics were notified with only one rejection cause")
205                                 .isOne()
206                         assertThat(metrics.clientRejectionCause[cause])
207                                 .describedAs("metrics were notified only once with correct client rejection cause")
208                                 .isOne()
209                     }
210                 }
211             }
212         }
213     }
214 })