2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2020 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
22 import arrow.core.Option
23 import com.google.protobuf.ByteString
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Tags
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
37 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
38 import org.onap.dcae.collectors.veshv.domain.VesMessage
39 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
40 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
41 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
42 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
43 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
44 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
45 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
46 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
48 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
49 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
50 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
51 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
52 import org.onap.ves.VesEventOuterClass
53 import java.time.Instant
54 import java.time.temporal.Temporal
55 import java.util.concurrent.TimeUnit
56 import kotlin.reflect.KClass
59 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
62 object MicrometerMetricsTest : Spek({
63 val doublePrecision = Percentage.withPercentage(0.5)
64 lateinit var registry: PrometheusMeterRegistry
65 lateinit var cut: MicrometerMetrics
68 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
69 cut = MicrometerMetrics(registry)
72 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
73 fun <T : Meter> verifyAllMetersAreUnchangedBut(
75 changedCounters: Collection<String>,
76 valueOf: (T) -> Double) {
78 .filter { it.id.name.startsWith(PREFIX) }
79 .filter { clazz.isInstance(it) }
81 .filterNot { it.id.name in changedCounters }
83 assertThat(valueOf(it))
84 .describedAs(it.id.toString())
85 .isCloseTo(0.0, doublePrecision)
89 setOf(*changedMeters).let { changedMetersCollection ->
90 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
91 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
96 describe("notifyBytesReceived") {
97 on("$PREFIX.data.received.bytes counter") {
98 val counterName = "$PREFIX.data.received.bytes"
100 it("should increment counter") {
102 cut.notifyBytesReceived(bytes)
104 registry.verifyCounter(counterName) {
105 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
109 it("should leave all other counters unchanged") {
110 cut.notifyBytesReceived(128)
111 verifyCountersAndTimersAreUnchangedBut(counterName)
116 describe("notifyMessageReceived") {
117 on("$PREFIX.messages.received counter") {
118 val counterName = "$PREFIX.messages.received"
120 it("should increment counter") {
121 cut.notifyMessageReceived(emptyWireProtocolFrame())
123 registry.verifyCounter(counterName) {
124 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
129 on("$PREFIX.messages.received.payload.bytes counter") {
130 val counterName = "$PREFIX.messages.received.payload.bytes"
132 it("should increment counter") {
134 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
136 registry.verifyCounter(counterName) {
137 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
142 it("should leave all other counters unchanged") {
143 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
144 verifyCountersAndTimersAreUnchangedBut(
145 "$PREFIX.messages.received",
146 "$PREFIX.messages.received.payload.bytes"
150 on("$PREFIX.messages.to.collector.travel.time") {
151 val counterName = "$PREFIX.messages.to.collector.travel.time"
152 val toCollectorTravelTimeMs = 100L
154 it("should update timer") {
155 val now = Instant.now()
156 val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
157 cut.notifyMessageReceived(vesMessage)
159 registry.verifyTimer(counterName) { timer ->
160 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
163 verifyCountersAndTimersAreUnchangedBut(counterName)
168 describe("notifyMessageReadyForRouting"){
169 on("$PREFIX.messages.processing.time.without.routing") {
170 val counterName = "$PREFIX.messages.processing.time.without.routing"
171 val processingTimeMs = 100L
173 it("should update timer") {
175 cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
177 registry.verifyTimer(counterName) { timer ->
178 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
180 verifyCountersAndTimersAreUnchangedBut(
182 "$PREFIX.messages.latency.without.routing"
187 on("$PREFIX.messages.latency.without.routing") {
188 val counterName = "$PREFIX.messages.latency.without.routing"
189 val latencyWithoutRoutingMs = 200L
191 it("should update timer") {
193 val sentAt = Instant.now().minusMillis(latencyWithoutRoutingMs)
195 cut.notifyMessageReadyForRouting(vesMessageSentAt(sentAt))
197 registry.verifyTimer(counterName) { timer ->
198 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(latencyWithoutRoutingMs.toDouble())
200 verifyCountersAndTimersAreUnchangedBut(
202 "$PREFIX.messages.processing.time.without.routing"
209 describe("notifyMessageSent") {
210 val topicName1 = "PERF3GPP"
211 val topicName2 = "CALLTRACE"
213 on("$PREFIX.messages.sent counter") {
214 val counterName = "$PREFIX.messages.sent"
216 it("should increment counter") {
217 cut.notifyMessageSent(routedMessage(topicName1))
219 registry.verifyCounter(counterName) {
220 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
222 verifyCountersAndTimersAreUnchangedBut(
224 "$PREFIX.messages.sent.topic",
225 "$PREFIX.messages.processing.time",
226 "$PREFIX.messages.latency")
230 on("$PREFIX.messages.sent.topic counter") {
231 val counterName = "$PREFIX.messages.sent.topic"
233 it("should handle counters for different topics") {
234 cut.notifyMessageSent(routedMessage(topicName1))
235 cut.notifyMessageSent(routedMessage(topicName2))
236 cut.notifyMessageSent(routedMessage(topicName2))
238 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
239 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
242 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
243 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
248 on("$PREFIX.messages.processing.time") {
249 val counterName = "$PREFIX.messages.processing.time"
250 val processingTimeMs = 100L
252 it("should update timer") {
254 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
256 registry.verifyTimer(counterName) { timer ->
257 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
259 verifyCountersAndTimersAreUnchangedBut(
261 "$PREFIX.messages.sent.topic",
262 "$PREFIX.messages.sent",
263 "$PREFIX.messages.latency")
267 on("$PREFIX.messages.latency") {
268 val counterName = "$PREFIX.messages.latency"
269 val latencyMs = 1666L
271 it("should update timer") {
273 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
275 registry.verifyTimer(counterName) { timer ->
276 assertThat(timer.mean(TimeUnit.MILLISECONDS))
277 .isGreaterThanOrEqualTo(latencyMs.toDouble())
278 .isLessThanOrEqualTo(latencyMs + 10000.0)
281 verifyCountersAndTimersAreUnchangedBut(
283 "$PREFIX.messages.sent.topic",
284 "$PREFIX.messages.sent",
285 "$PREFIX.messages.processing.time")
290 describe("notifyMessageDropped") {
291 on("$PREFIX.messages.dropped counter") {
292 val counterName = "$PREFIX.messages.dropped"
294 it("should increment counter") {
295 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
296 cut.notifyMessageDropped(INVALID_MESSAGE)
298 registry.verifyCounter(counterName) {
299 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
301 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
305 on("$PREFIX.messages.dropped.cause counter") {
306 val counterName = "$PREFIX.messages.dropped.cause"
308 it("should handle counters for different drop reasons") {
309 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
310 cut.notifyMessageDropped(INVALID_MESSAGE)
311 cut.notifyMessageDropped(INVALID_MESSAGE)
313 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
314 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
317 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
318 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
324 describe("notifyClientConnected") {
325 on("$PREFIX.connections counter") {
326 val counterName = "$PREFIX.connections"
328 it("should increment counter") {
329 cut.notifyClientConnected()
330 cut.notifyClientConnected()
332 registry.verifyCounter(counterName) {
333 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
335 verifyCountersAndTimersAreUnchangedBut(counterName)
341 describe("notifyClientDisconnected") {
342 on("$PREFIX.disconnections counter") {
343 val counterName = "$PREFIX.disconnections"
345 it("should increment counter") {
346 cut.notifyClientDisconnected()
347 cut.notifyClientDisconnected()
349 registry.verifyCounter(counterName) {
350 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
352 verifyCountersAndTimersAreUnchangedBut(counterName)
358 describe("notifyClientRejected") {
360 on("$PREFIX.clients.rejected") {
361 val counterName = "$PREFIX.clients.rejected"
362 it("should increment counter for each possible reason") {
363 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
364 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
366 registry.verifyCounter(counterName) {
367 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
369 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
373 on("$PREFIX.clients.rejected.cause counter") {
374 val counterName = "$PREFIX.clients.rejected.cause"
375 it("should handle counters for different rejection reasons") {
376 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
377 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
378 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
380 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
381 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
384 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
385 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
391 describe("$PREFIX.connections.active gauge") {
392 val gaugeName = "$PREFIX.connections.active"
394 on("connection traffic") {
395 it("should calculate positive difference between connected and disconnected clients") {
396 cut.notifyClientConnected()
397 cut.notifyClientConnected()
398 cut.notifyClientConnected()
399 cut.notifyClientDisconnected()
401 registry.verifyGauge(gaugeName) {
402 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
406 it("should calculate no difference between connected and disconnected clients") {
407 cut.notifyClientDisconnected()
408 cut.notifyClientDisconnected()
410 registry.verifyGauge(gaugeName) {
411 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
415 it("should calculate negative difference between connected and disconnected clients") {
416 cut.notifyClientDisconnected()
418 registry.verifyGauge(gaugeName) {
419 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
426 private fun vesMessageSentAt(sentAt: Instant): VesMessage {
427 val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
428 val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
429 return VesMessage(commonHeader,
430 wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements")))
433 private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
434 val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
435 val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
436 return VesMessage(commonHeader,
437 wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
440 private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
441 val commonHeader = commonHeader(domain)
442 return VesMessage(commonHeader,
443 wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
446 private fun routedMessage(topic: String, partition: Int = 0) =
447 vesEvent().run { toRoutedMessage(topic, partition) }
449 private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
450 vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
452 private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
454 val builder = toBuilder()
455 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
456 builder.build().toRoutedMessage(topic, partition)
459 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
461 receivedAt: Temporal = Instant.now()) =
463 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
465 Option.just(partition)