2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Timer
27 import io.micrometer.core.instrument.search.RequiredSearch
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
42 import org.onap.dcae.collectors.veshv.model.RoutedMessage
43 import org.onap.dcae.collectors.veshv.model.VesMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
45 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
48 import java.time.Instant
49 import java.time.temporal.Temporal
50 import java.util.concurrent.TimeUnit
51 import kotlin.reflect.KClass
54 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
57 object MicrometerMetricsTest : Spek({
58 val doublePrecision = Percentage.withPercentage(0.5)
59 val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
60 lateinit var registry: PrometheusMeterRegistry
61 lateinit var cut: MicrometerMetrics
64 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
65 cut = MicrometerMetrics(registry)
68 fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
70 fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
74 { ex -> assertThat(ex).doesNotThrowAnyException() },
78 fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
79 verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
81 fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
82 verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
84 fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
85 verifyMeter(search, RequiredSearch::counter, verifier)
87 fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
88 verifyCounter(registrySearch(name), verifier)
90 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
91 fun <T : Meter> verifyAllMetersAreUnchangedBut(
93 changedCounters: Collection<String>,
94 valueOf: (T) -> Double) {
96 .filter { it.id.name.startsWith(PREFIX) }
97 .filter { clazz.isInstance(it) }
99 .filterNot { it.id.name in changedCounters }
101 assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
105 setOf(*changedMeters).let { changedMetersCollection ->
106 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
107 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
111 describe("notifyBytesReceived") {
113 on("$PREFIX.data.received.bytes counter") {
114 val counterName = "$PREFIX.data.received.bytes"
116 it("should increment counter") {
118 cut.notifyBytesReceived(bytes)
120 verifyCounter(counterName) {
121 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
125 it("should leave all other counters unchanged") {
126 cut.notifyBytesReceived(128)
127 verifyCountersAndTimersAreUnchangedBut(counterName)
132 describe("notifyMessageReceived") {
133 on("$PREFIX.messages.received.count counter") {
134 val counterName = "$PREFIX.messages.received.count"
136 it("should increment counter") {
137 cut.notifyMessageReceived(emptyWireProtocolFrame())
139 verifyCounter(counterName) {
140 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
145 on("$PREFIX.messages.received.bytes counter") {
146 val counterName = "$PREFIX.messages.received.bytes"
148 it("should increment counter") {
150 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
152 verifyCounter(counterName) {
153 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
158 it("should leave all other counters unchanged") {
159 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
160 verifyCountersAndTimersAreUnchangedBut(
161 "$PREFIX.messages.received.count",
162 "$PREFIX.messages.received.bytes"
167 describe("notifyMessageSent") {
168 val topicName1 = "PERF3GPP"
169 val topicName2 = "CALLTRACE"
171 on("$PREFIX.messages.sent.count counter") {
172 val counterName = "$PREFIX.messages.sent.count"
174 it("should increment counter") {
175 cut.notifyMessageSent(routedMessage(topicName1))
177 verifyCounter(counterName) {
178 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
180 verifyCountersAndTimersAreUnchangedBut(
182 "$PREFIX.messages.sent.topic.count",
183 "$PREFIX.messages.processing.time",
184 "$PREFIX.messages.latency.time")
188 on("$PREFIX.messages.sent.topic.count counter") {
189 val counterName = "$PREFIX.messages.sent.topic.count"
190 it("should handle counters for different topics") {
191 cut.notifyMessageSent(routedMessage(topicName1))
192 cut.notifyMessageSent(routedMessage(topicName2))
193 cut.notifyMessageSent(routedMessage(topicName2))
195 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
196 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
199 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
200 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
205 on("$PREFIX.messages.processing.time") {
206 val counterName = "$PREFIX.messages.processing.time"
207 val processingTimeMs = 100L
209 it("should update timer") {
211 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
213 verifyTimer(counterName) { timer ->
214 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
216 verifyCountersAndTimersAreUnchangedBut(
218 "$PREFIX.messages.sent.topic.count",
219 "$PREFIX.messages.sent.count",
220 "$PREFIX.messages.latency.time")
224 on("$PREFIX.messages.latency.time") {
225 val counterName = "$PREFIX.messages.latency.time"
226 val latencyMs = 1666L
228 it("should update timer") {
230 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
232 verifyTimer(counterName) { timer ->
233 assertThat(timer.mean(TimeUnit.MILLISECONDS))
234 .isGreaterThanOrEqualTo(latencyMs.toDouble())
235 .isLessThanOrEqualTo(latencyMs + 10000.0)
238 verifyCountersAndTimersAreUnchangedBut(
240 "$PREFIX.messages.sent.topic.count",
241 "$PREFIX.messages.sent.count",
242 "$PREFIX.messages.processing.time")
248 describe("notifyMessageDropped") {
250 on("$PREFIX.messages.dropped.count counter") {
251 val counterName = "$PREFIX.messages.dropped.count"
252 it("should increment counter") {
253 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
254 cut.notifyMessageDropped(INVALID_MESSAGE)
256 verifyCounter(counterName) {
257 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
259 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
263 on("$PREFIX.messages.dropped.cause.count counter") {
264 val counterName = "$PREFIX.messages.dropped.cause.count"
265 it("should handle counters for different drop reasons") {
266 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
267 cut.notifyMessageDropped(INVALID_MESSAGE)
268 cut.notifyMessageDropped(INVALID_MESSAGE)
270 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
271 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
274 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
275 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
281 describe("processing gauge") {
282 it("should show difference between sent and received messages") {
284 on("positive difference") {
285 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
286 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
287 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
288 cut.notifyMessageSent(routedMessage("perf3gpp"))
289 verifyGauge("messages.processing.count") {
290 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
294 on("zero difference") {
295 cut.notifyMessageReceived(emptyWireProtocolFrame())
296 cut.notifyMessageSent(routedMessage("perf3gpp"))
297 verifyGauge("messages.processing.count") {
298 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
302 on("negative difference") {
303 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
304 cut.notifyMessageSent(routedMessage("fault"))
305 cut.notifyMessageSent(routedMessage("perf3gpp"))
306 verifyGauge("messages.processing.count") {
307 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
313 describe("notifyClientRejected") {
315 on("$PREFIX.clients.rejected.count") {
316 val counterName = "$PREFIX.clients.rejected.count"
317 it("should increment counter for each possible reason") {
318 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
319 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
321 verifyCounter(counterName) {
322 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
324 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
328 on("$PREFIX.clients.rejected.cause.count counter") {
329 val counterName = "$PREFIX.clients.rejected.cause.count"
330 it("should handle counters for different rejection reasons") {
331 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
332 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
333 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
335 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
336 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
339 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
340 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
347 fun routedMessage(topic: String, partition: Int = 0) =
348 vesEvent().let { evt ->
349 RoutedMessage(topic, partition,
350 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
353 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
354 vesEvent().let { evt ->
355 RoutedMessage(topic, partition,
356 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
359 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
360 vesEvent().let { evt ->
361 val builder = evt.toBuilder()
362 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
365 RoutedMessage(topic, partition,
366 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))