2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
22 import arrow.core.Option
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Meter
25 import io.micrometer.core.instrument.Tags
26 import io.micrometer.core.instrument.Timer
27 import io.micrometer.prometheus.PrometheusConfig
28 import io.micrometer.prometheus.PrometheusMeterRegistry
29 import org.assertj.core.api.Assertions.assertThat
30 import org.assertj.core.data.Percentage
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
37 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
39 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
41 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
42 import org.onap.dcae.collectors.veshv.domain.VesMessage
43 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
44 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
45 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
46 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
47 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
48 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
49 import org.onap.ves.VesEventOuterClass
50 import java.time.Instant
51 import java.time.temporal.Temporal
52 import java.util.concurrent.TimeUnit
53 import kotlin.reflect.KClass
56 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
59 object MicrometerMetricsTest : Spek({
60 val doublePrecision = Percentage.withPercentage(0.5)
61 lateinit var registry: PrometheusMeterRegistry
62 lateinit var cut: MicrometerMetrics
65 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
66 cut = MicrometerMetrics(registry)
69 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
70 fun <T : Meter> verifyAllMetersAreUnchangedBut(
72 changedCounters: Collection<String>,
73 valueOf: (T) -> Double) {
75 .filter { it.id.name.startsWith(PREFIX) }
76 .filter { clazz.isInstance(it) }
78 .filterNot { it.id.name in changedCounters }
80 assertThat(valueOf(it))
81 .describedAs(it.id.toString())
82 .isCloseTo(0.0, doublePrecision)
86 setOf(*changedMeters).let { changedMetersCollection ->
87 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
88 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
93 describe("notifyBytesReceived") {
94 on("$PREFIX.data.received.bytes counter") {
95 val counterName = "$PREFIX.data.received.bytes"
97 it("should increment counter") {
99 cut.notifyBytesReceived(bytes)
101 registry.verifyCounter(counterName) {
102 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
106 it("should leave all other counters unchanged") {
107 cut.notifyBytesReceived(128)
108 verifyCountersAndTimersAreUnchangedBut(counterName)
113 describe("notifyMessageReceived") {
114 on("$PREFIX.messages.received counter") {
115 val counterName = "$PREFIX.messages.received"
117 it("should increment counter") {
118 cut.notifyMessageReceived(emptyWireProtocolFrame())
120 registry.verifyCounter(counterName) {
121 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
126 on("$PREFIX.messages.received.payload.bytes counter") {
127 val counterName = "$PREFIX.messages.received.payload.bytes"
129 it("should increment counter") {
131 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
133 registry.verifyCounter(counterName) {
134 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
139 it("should leave all other counters unchanged") {
140 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
141 verifyCountersAndTimersAreUnchangedBut(
142 "$PREFIX.messages.received",
143 "$PREFIX.messages.received.payload.bytes"
148 describe("notifyMessageSent") {
149 val topicName1 = "PERF3GPP"
150 val topicName2 = "CALLTRACE"
152 on("$PREFIX.messages.sent counter") {
153 val counterName = "$PREFIX.messages.sent"
155 it("should increment counter") {
156 cut.notifyMessageSent(routedMessage(topicName1))
158 registry.verifyCounter(counterName) {
159 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
161 verifyCountersAndTimersAreUnchangedBut(
163 "$PREFIX.messages.sent.topic",
164 "$PREFIX.messages.processing.time",
165 "$PREFIX.messages.latency")
169 on("$PREFIX.messages.sent.topic counter") {
170 val counterName = "$PREFIX.messages.sent.topic"
172 it("should handle counters for different topics") {
173 cut.notifyMessageSent(routedMessage(topicName1))
174 cut.notifyMessageSent(routedMessage(topicName2))
175 cut.notifyMessageSent(routedMessage(topicName2))
177 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
178 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
181 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
182 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
187 on("$PREFIX.messages.processing.time") {
188 val counterName = "$PREFIX.messages.processing.time"
189 val processingTimeMs = 100L
191 it("should update timer") {
193 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
195 registry.verifyTimer(counterName) { timer ->
196 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
198 verifyCountersAndTimersAreUnchangedBut(
200 "$PREFIX.messages.sent.topic",
201 "$PREFIX.messages.sent",
202 "$PREFIX.messages.latency")
206 on("$PREFIX.messages.latency") {
207 val counterName = "$PREFIX.messages.latency"
208 val latencyMs = 1666L
210 it("should update timer") {
212 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
214 registry.verifyTimer(counterName) { timer ->
215 assertThat(timer.mean(TimeUnit.MILLISECONDS))
216 .isGreaterThanOrEqualTo(latencyMs.toDouble())
217 .isLessThanOrEqualTo(latencyMs + 10000.0)
220 verifyCountersAndTimersAreUnchangedBut(
222 "$PREFIX.messages.sent.topic",
223 "$PREFIX.messages.sent",
224 "$PREFIX.messages.processing.time")
229 describe("notifyMessageDropped") {
230 on("$PREFIX.messages.dropped counter") {
231 val counterName = "$PREFIX.messages.dropped"
233 it("should increment counter") {
234 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
235 cut.notifyMessageDropped(INVALID_MESSAGE)
237 registry.verifyCounter(counterName) {
238 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
240 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
244 on("$PREFIX.messages.dropped.cause counter") {
245 val counterName = "$PREFIX.messages.dropped.cause"
247 it("should handle counters for different drop reasons") {
248 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
249 cut.notifyMessageDropped(INVALID_MESSAGE)
250 cut.notifyMessageDropped(INVALID_MESSAGE)
252 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
253 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
256 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
257 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
263 describe("notifyClientConnected") {
264 on("$PREFIX.connections counter") {
265 val counterName = "$PREFIX.connections"
267 it("should increment counter") {
268 cut.notifyClientConnected()
269 cut.notifyClientConnected()
271 registry.verifyCounter(counterName) {
272 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
274 verifyCountersAndTimersAreUnchangedBut(counterName)
280 describe("notifyClientDisconnected") {
281 on("$PREFIX.disconnections counter") {
282 val counterName = "$PREFIX.disconnections"
284 it("should increment counter") {
285 cut.notifyClientDisconnected()
286 cut.notifyClientDisconnected()
288 registry.verifyCounter(counterName) {
289 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
291 verifyCountersAndTimersAreUnchangedBut(counterName)
297 describe("notifyClientRejected") {
299 on("$PREFIX.clients.rejected") {
300 val counterName = "$PREFIX.clients.rejected"
301 it("should increment counter for each possible reason") {
302 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
303 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
305 registry.verifyCounter(counterName) {
306 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
308 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
312 on("$PREFIX.clients.rejected.cause counter") {
313 val counterName = "$PREFIX.clients.rejected.cause"
314 it("should handle counters for different rejection reasons") {
315 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
316 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
317 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
319 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
320 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
323 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
324 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
330 describe("$PREFIX.connections.active gauge") {
331 val gaugeName = "$PREFIX.connections.active"
333 on("connection traffic") {
334 it("should calculate positive difference between connected and disconnected clients") {
335 cut.notifyClientConnected()
336 cut.notifyClientConnected()
337 cut.notifyClientConnected()
338 cut.notifyClientDisconnected()
340 registry.verifyGauge(gaugeName) {
341 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
345 it("should calculate no difference between connected and disconnected clients") {
346 cut.notifyClientDisconnected()
347 cut.notifyClientDisconnected()
349 registry.verifyGauge(gaugeName) {
350 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
354 it("should calculate negative difference between connected and disconnected clients") {
355 cut.notifyClientDisconnected()
357 registry.verifyGauge(gaugeName) {
358 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
365 fun routedMessage(topic: String, partition: Int = 0) =
366 vesEvent().run { toRoutedMessage(topic, partition) }
368 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
369 vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
371 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
373 val builder = toBuilder()
374 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
375 builder.build().toRoutedMessage(topic, partition)
378 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
380 receivedAt: Temporal = Instant.now()) =
382 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
384 Option.just(partition)