2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Timer
27 import io.micrometer.core.instrument.search.RequiredSearch
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
42 import org.onap.dcae.collectors.veshv.model.RoutedMessage
43 import org.onap.dcae.collectors.veshv.model.VesMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
45 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
47 import java.time.Instant
48 import java.time.temporal.Temporal
49 import java.util.concurrent.TimeUnit
50 import kotlin.reflect.KClass
53 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
56 object MicrometerMetricsTest : Spek({
57 val doublePrecision = Percentage.withPercentage(0.5)
58 lateinit var registry: PrometheusMeterRegistry
59 lateinit var cut: MicrometerMetrics
62 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
63 cut = MicrometerMetrics(registry)
66 fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
68 fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
72 { ex -> assertThat(ex).doesNotThrowAnyException() },
76 fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
77 verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
79 fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
80 verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
82 fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
83 verifyMeter(search, RequiredSearch::counter, verifier)
85 fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
86 verifyCounter(registrySearch(name), verifier)
89 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
90 fun <T : Meter> verifyAllMetersAreUnchangedBut(
92 changedCounters: Collection<String>,
93 valueOf: (T) -> Double) {
95 .filter { it.id.name.startsWith(PREFIX) }
96 .filter { clazz.isInstance(it) }
98 .filterNot { it.id.name in changedCounters }
100 assertThat(valueOf(it))
101 .describedAs(it.id.toString())
102 .isCloseTo(0.0, doublePrecision)
106 setOf(*changedMeters).let { changedMetersCollection ->
107 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
108 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
113 describe("notifyBytesReceived") {
114 on("$PREFIX.data.received.bytes counter") {
115 val counterName = "$PREFIX.data.received.bytes"
117 it("should increment counter") {
119 cut.notifyBytesReceived(bytes)
121 verifyCounter(counterName) {
122 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
126 it("should leave all other counters unchanged") {
127 cut.notifyBytesReceived(128)
128 verifyCountersAndTimersAreUnchangedBut(counterName)
133 describe("notifyMessageReceived") {
134 on("$PREFIX.messages.received counter") {
135 val counterName = "$PREFIX.messages.received"
137 it("should increment counter") {
138 cut.notifyMessageReceived(emptyWireProtocolFrame())
140 verifyCounter(counterName) {
141 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
146 on("$PREFIX.messages.received.payload.bytes counter") {
147 val counterName = "$PREFIX.messages.received.payload.bytes"
149 it("should increment counter") {
151 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
153 verifyCounter(counterName) {
154 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
159 it("should leave all other counters unchanged") {
160 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
161 verifyCountersAndTimersAreUnchangedBut(
162 "$PREFIX.messages.received",
163 "$PREFIX.messages.received.payload.bytes"
168 describe("notifyMessageSent") {
169 val topicName1 = "PERF3GPP"
170 val topicName2 = "CALLTRACE"
172 on("$PREFIX.messages.sent counter") {
173 val counterName = "$PREFIX.messages.sent"
175 it("should increment counter") {
176 cut.notifyMessageSent(routedMessage(topicName1))
178 verifyCounter(counterName) {
179 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
181 verifyCountersAndTimersAreUnchangedBut(
183 "$PREFIX.messages.sent.topic",
184 "$PREFIX.messages.processing.time",
185 "$PREFIX.messages.latency")
189 on("$PREFIX.messages.sent.topic counter") {
190 val counterName = "$PREFIX.messages.sent.topic"
192 it("should handle counters for different topics") {
193 cut.notifyMessageSent(routedMessage(topicName1))
194 cut.notifyMessageSent(routedMessage(topicName2))
195 cut.notifyMessageSent(routedMessage(topicName2))
197 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
198 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
201 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
202 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
207 on("$PREFIX.messages.processing.time") {
208 val counterName = "$PREFIX.messages.processing.time"
209 val processingTimeMs = 100L
211 it("should update timer") {
213 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
215 verifyTimer(counterName) { timer ->
216 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
218 verifyCountersAndTimersAreUnchangedBut(
220 "$PREFIX.messages.sent.topic",
221 "$PREFIX.messages.sent",
222 "$PREFIX.messages.latency")
226 on("$PREFIX.messages.latency") {
227 val counterName = "$PREFIX.messages.latency"
228 val latencyMs = 1666L
230 it("should update timer") {
232 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
234 verifyTimer(counterName) { timer ->
235 assertThat(timer.mean(TimeUnit.MILLISECONDS))
236 .isGreaterThanOrEqualTo(latencyMs.toDouble())
237 .isLessThanOrEqualTo(latencyMs + 10000.0)
240 verifyCountersAndTimersAreUnchangedBut(
242 "$PREFIX.messages.sent.topic",
243 "$PREFIX.messages.sent",
244 "$PREFIX.messages.processing.time")
249 describe("notifyMessageDropped") {
250 on("$PREFIX.messages.dropped counter") {
251 val counterName = "$PREFIX.messages.dropped"
253 it("should increment counter") {
254 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
255 cut.notifyMessageDropped(INVALID_MESSAGE)
257 verifyCounter(counterName) {
258 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
260 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
264 on("$PREFIX.messages.dropped.cause counter") {
265 val counterName = "$PREFIX.messages.dropped.cause"
267 it("should handle counters for different drop reasons") {
268 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
269 cut.notifyMessageDropped(INVALID_MESSAGE)
270 cut.notifyMessageDropped(INVALID_MESSAGE)
272 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
273 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
276 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
277 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
283 describe("notifyClientConnected") {
284 on("$PREFIX.connections counter") {
285 val counterName = "$PREFIX.connections"
287 it("should increment counter") {
288 cut.notifyClientConnected()
289 cut.notifyClientConnected()
291 verifyCounter(counterName) {
292 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
294 verifyCountersAndTimersAreUnchangedBut(counterName)
300 describe("notifyClientDisconnected") {
301 on("$PREFIX.disconnections counter") {
302 val counterName = "$PREFIX.disconnections"
304 it("should increment counter") {
305 cut.notifyClientDisconnected()
306 cut.notifyClientDisconnected()
308 verifyCounter(counterName) {
309 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
311 verifyCountersAndTimersAreUnchangedBut(counterName)
317 describe("notifyClientRejected") {
319 on("$PREFIX.clients.rejected") {
320 val counterName = "$PREFIX.clients.rejected"
321 it("should increment counter for each possible reason") {
322 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
323 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
325 verifyCounter(counterName) {
326 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
328 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
332 on("$PREFIX.clients.rejected.cause counter") {
333 val counterName = "$PREFIX.clients.rejected.cause"
334 it("should handle counters for different rejection reasons") {
335 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
336 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
337 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
339 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
340 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
343 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
344 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
350 describe("$PREFIX.connections.active gauge") {
351 val gaugeName = "$PREFIX.connections.active"
353 on("connection traffic") {
354 it("should calculate positive difference between connected and disconnected clients") {
355 cut.notifyClientConnected()
356 cut.notifyClientConnected()
357 cut.notifyClientConnected()
358 cut.notifyClientDisconnected()
360 verifyGauge(gaugeName) {
361 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
365 it("should calculate no difference between connected and disconnected clients") {
366 cut.notifyClientDisconnected()
367 cut.notifyClientDisconnected()
369 verifyGauge(gaugeName) {
370 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
374 it("should calculate negative difference between connected and disconnected clients") {
375 cut.notifyClientDisconnected()
377 verifyGauge(gaugeName) {
378 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
385 fun routedMessage(topic: String, partition: Int = 0) =
386 vesEvent().let { evt ->
387 RoutedMessage(topic, partition,
388 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
391 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
392 vesEvent().let { evt ->
393 RoutedMessage(topic, partition,
394 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
397 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
398 vesEvent().let { evt ->
399 val builder = evt.toBuilder()
400 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
403 RoutedMessage(topic, partition,
404 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))