2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Timer
26 import io.micrometer.core.instrument.search.RequiredSearch
27 import io.micrometer.prometheus.PrometheusConfig
28 import io.micrometer.prometheus.PrometheusMeterRegistry
29 import org.assertj.core.api.Assertions.assertThat
30 import org.assertj.core.data.Percentage
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
37 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
38 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
40 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.RoutedMessage
42 import org.onap.dcae.collectors.veshv.model.VesMessage
43 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
44 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
45 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
47 import java.time.Instant
48 import io.micrometer.core.instrument.Meter
50 import java.time.temporal.Temporal
51 import java.util.concurrent.TimeUnit
52 import kotlin.reflect.KClass
55 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
58 object MicrometerMetricsTest : Spek({
59 val doublePrecision = Percentage.withPercentage(0.5)
60 lateinit var registry: PrometheusMeterRegistry
61 lateinit var cut: MicrometerMetrics
64 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
65 cut = MicrometerMetrics(registry)
68 fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
70 fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
74 { ex -> assertThat(ex).doesNotThrowAnyException() },
78 fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
79 verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
81 fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
82 verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
84 fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
85 verifyMeter(search, RequiredSearch::counter, verifier)
87 fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
88 verifyCounter(registrySearch(name), verifier)
91 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
92 fun <T : Meter> verifyAllMetersAreUnchangedBut(
94 changedCounters: Collection<String>,
95 valueOf: (T) -> Double) {
97 .filter { it.id.name.startsWith(PREFIX) }
98 .filter { clazz.isInstance(it) }
100 .filterNot { it.id.name in changedCounters }
102 assertThat(valueOf(it))
103 .describedAs(it.id.toString())
104 .isCloseTo(0.0, doublePrecision)
108 setOf(*changedMeters).let { changedMetersCollection ->
109 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
110 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
115 describe("notifyBytesReceived") {
116 on("$PREFIX.data.received.bytes counter") {
117 val counterName = "$PREFIX.data.received.bytes"
119 it("should increment counter") {
121 cut.notifyBytesReceived(bytes)
123 verifyCounter(counterName) {
124 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
128 it("should leave all other counters unchanged") {
129 cut.notifyBytesReceived(128)
130 verifyCountersAndTimersAreUnchangedBut(counterName)
135 describe("notifyMessageReceived") {
136 on("$PREFIX.messages.received.count counter") {
137 val counterName = "$PREFIX.messages.received.count"
139 it("should increment counter") {
140 cut.notifyMessageReceived(emptyWireProtocolFrame())
142 verifyCounter(counterName) {
143 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
148 on("$PREFIX.messages.received.bytes counter") {
149 val counterName = "$PREFIX.messages.received.bytes"
151 it("should increment counter") {
153 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
155 verifyCounter(counterName) {
156 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
161 it("should leave all other counters unchanged") {
162 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
163 verifyCountersAndTimersAreUnchangedBut(
164 "$PREFIX.messages.received.count",
165 "$PREFIX.messages.received.bytes"
170 describe("notifyMessageSent") {
171 val topicName1 = "PERF3GPP"
172 val topicName2 = "CALLTRACE"
174 on("$PREFIX.messages.sent.count counter") {
175 val counterName = "$PREFIX.messages.sent.count"
177 it("should increment counter") {
178 cut.notifyMessageSent(routedMessage(topicName1))
180 verifyCounter(counterName) {
181 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
183 verifyCountersAndTimersAreUnchangedBut(
185 "$PREFIX.messages.sent.topic.count",
186 "$PREFIX.messages.processing.time",
187 "$PREFIX.messages.latency.time")
191 on("$PREFIX.messages.sent.topic.count counter") {
192 val counterName = "$PREFIX.messages.sent.topic.count"
194 it("should handle counters for different topics") {
195 cut.notifyMessageSent(routedMessage(topicName1))
196 cut.notifyMessageSent(routedMessage(topicName2))
197 cut.notifyMessageSent(routedMessage(topicName2))
199 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
200 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
203 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
204 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
209 on("$PREFIX.messages.processing.time") {
210 val counterName = "$PREFIX.messages.processing.time"
211 val processingTimeMs = 100L
213 it("should update timer") {
215 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
217 verifyTimer(counterName) { timer ->
218 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
220 verifyCountersAndTimersAreUnchangedBut(
222 "$PREFIX.messages.sent.topic.count",
223 "$PREFIX.messages.sent.count",
224 "$PREFIX.messages.latency.time")
228 on("$PREFIX.messages.latency.time") {
229 val counterName = "$PREFIX.messages.latency.time"
230 val latencyMs = 1666L
232 it("should update timer") {
234 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
236 verifyTimer(counterName) { timer ->
237 assertThat(timer.mean(TimeUnit.MILLISECONDS))
238 .isGreaterThanOrEqualTo(latencyMs.toDouble())
239 .isLessThanOrEqualTo(latencyMs + 10000.0)
242 verifyCountersAndTimersAreUnchangedBut(
244 "$PREFIX.messages.sent.topic.count",
245 "$PREFIX.messages.sent.count",
246 "$PREFIX.messages.processing.time")
251 describe("notifyMessageDropped") {
252 on("$PREFIX.messages.dropped.count counter") {
253 val counterName = "$PREFIX.messages.dropped.count"
255 it("should increment counter") {
256 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
257 cut.notifyMessageDropped(INVALID_MESSAGE)
259 verifyCounter(counterName) {
260 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
262 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
266 on("$PREFIX.messages.dropped.cause.count counter") {
267 val counterName = "$PREFIX.messages.dropped.cause.count"
269 it("should handle counters for different drop reasons") {
270 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
271 cut.notifyMessageDropped(INVALID_MESSAGE)
272 cut.notifyMessageDropped(INVALID_MESSAGE)
274 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
275 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
278 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
279 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
285 describe("notifyClientConnected") {
286 on("$PREFIX.connections.total.count counter") {
287 val counterName = "$PREFIX.connections.total.count"
289 it("should increment counter") {
290 cut.notifyClientConnected()
291 cut.notifyClientConnected()
293 verifyCounter(counterName) {
294 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
296 verifyCountersAndTimersAreUnchangedBut(counterName)
302 describe("notifyClientDisconnected") {
303 on("$PREFIX.disconnections.count counter") {
304 val counterName = "$PREFIX.disconnections.count"
306 it("should increment counter") {
307 cut.notifyClientDisconnected()
308 cut.notifyClientDisconnected()
310 verifyCounter(counterName) {
311 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
313 verifyCountersAndTimersAreUnchangedBut(counterName)
319 describe("notifyClientRejected") {
321 on("$PREFIX.clients.rejected.count") {
322 val counterName = "$PREFIX.clients.rejected.count"
323 it("should increment counter for each possible reason") {
324 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
325 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
327 verifyCounter(counterName) {
328 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
330 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
334 on("$PREFIX.clients.rejected.cause.count counter") {
335 val counterName = "$PREFIX.clients.rejected.cause.count"
336 it("should handle counters for different rejection reasons") {
337 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
338 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
339 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
341 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
342 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
345 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
346 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
352 describe("$PREFIX.messages.processing.count gauge") {
353 val gaugeName = "$PREFIX.messages.processing.count"
355 on("message traffic") {
356 it("should calculate positive difference between sent and received messages") {
357 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
358 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
359 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
360 cut.notifyMessageSent(routedMessage("perf3gpp"))
362 verifyGauge(gaugeName) {
363 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
367 it("should calculate no difference between sent and received messages") {
368 cut.notifyMessageSent(routedMessage("perf3gpp"))
369 cut.notifyMessageSent(routedMessage("fault"))
371 verifyGauge(gaugeName) {
372 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
376 it("should calculate negative difference between sent and received messages") {
377 cut.notifyMessageSent(routedMessage("fault"))
379 verifyGauge(gaugeName) {
380 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
386 describe("$PREFIX.connections.active.count gauge") {
387 val gaugeName = "$PREFIX.connections.active.count"
389 on("connection traffic") {
390 it("should calculate positive difference between connected and disconnected clients") {
391 cut.notifyClientConnected()
392 cut.notifyClientConnected()
393 cut.notifyClientConnected()
394 cut.notifyClientDisconnected()
396 verifyGauge(gaugeName) {
397 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
401 it("should calculate no difference between connected and disconnected clients") {
402 cut.notifyClientDisconnected()
403 cut.notifyClientDisconnected()
405 verifyGauge(gaugeName) {
406 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
410 it("should calculate negative difference between connected and disconnected clients") {
411 cut.notifyClientDisconnected()
413 verifyGauge(gaugeName) {
414 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
421 fun routedMessage(topic: String, partition: Int = 0) =
422 vesEvent().let { evt ->
423 RoutedMessage(topic, partition,
424 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
427 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
428 vesEvent().let { evt ->
429 RoutedMessage(topic, partition,
430 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
433 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
434 vesEvent().let { evt ->
435 val builder = evt.toBuilder()
436 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
439 RoutedMessage(topic, partition,
440 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))