2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2020 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
22 import arrow.core.Option
23 import com.google.protobuf.ByteString
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Tags
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
37 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
38 import org.onap.dcae.collectors.veshv.domain.VesMessage
39 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
40 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
41 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
42 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
43 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
44 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
45 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
46 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
48 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
49 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
50 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
51 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
52 import org.onap.ves.VesEventOuterClass
53 import java.time.Instant
54 import java.time.temporal.Temporal
55 import java.util.concurrent.TimeUnit
56 import kotlin.reflect.KClass
59 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
62 object MicrometerMetricsTest : Spek({
63 val doublePrecision = Percentage.withPercentage(0.5)
64 lateinit var registry: PrometheusMeterRegistry
65 lateinit var cut: MicrometerMetrics
68 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
69 cut = MicrometerMetrics(registry)
72 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
73 fun <T : Meter> verifyAllMetersAreUnchangedBut(
75 changedCounters: Collection<String>,
76 valueOf: (T) -> Double) {
78 .filter { it.id.name.startsWith(PREFIX) }
79 .filter { clazz.isInstance(it) }
81 .filterNot { it.id.name in changedCounters }
83 assertThat(valueOf(it))
84 .describedAs(it.id.toString())
85 .isCloseTo(0.0, doublePrecision)
89 setOf(*changedMeters).let { changedMetersCollection ->
90 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
91 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
96 describe("notifyBytesReceived") {
97 on("$PREFIX.data.received.bytes counter") {
98 val counterName = "$PREFIX.data.received.bytes"
100 it("should increment counter") {
102 cut.notifyBytesReceived(bytes)
104 registry.verifyCounter(counterName) {
105 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
109 it("should leave all other counters unchanged") {
110 cut.notifyBytesReceived(128)
111 verifyCountersAndTimersAreUnchangedBut(counterName)
116 describe("notifyMessageReceived") {
117 on("$PREFIX.messages.received counter") {
118 val counterName = "$PREFIX.messages.received"
120 it("should increment counter") {
121 cut.notifyMessageReceived(emptyWireProtocolFrame())
123 registry.verifyCounter(counterName) {
124 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
129 on("$PREFIX.messages.received.payload.bytes counter") {
130 val counterName = "$PREFIX.messages.received.payload.bytes"
132 it("should increment counter") {
134 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
136 registry.verifyCounter(counterName) {
137 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
142 it("should leave all other counters unchanged") {
143 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
144 verifyCountersAndTimersAreUnchangedBut(
145 "$PREFIX.messages.received",
146 "$PREFIX.messages.received.payload.bytes"
151 describe("notifyMessageSent") {
152 val topicName1 = "PERF3GPP"
153 val topicName2 = "CALLTRACE"
155 on("$PREFIX.messages.sent counter") {
156 val counterName = "$PREFIX.messages.sent"
158 it("should increment counter") {
159 cut.notifyMessageSent(routedMessage(topicName1))
161 registry.verifyCounter(counterName) {
162 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
164 verifyCountersAndTimersAreUnchangedBut(
166 "$PREFIX.messages.sent.topic",
167 "$PREFIX.messages.processing.time",
168 "$PREFIX.messages.latency")
172 on("$PREFIX.messages.sent.topic counter") {
173 val counterName = "$PREFIX.messages.sent.topic"
175 it("should handle counters for different topics") {
176 cut.notifyMessageSent(routedMessage(topicName1))
177 cut.notifyMessageSent(routedMessage(topicName2))
178 cut.notifyMessageSent(routedMessage(topicName2))
180 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
181 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
184 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
185 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
190 on("$PREFIX.messages.processing.time") {
191 val counterName = "$PREFIX.messages.processing.time"
192 val processingTimeMs = 100L
194 it("should update timer") {
196 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
198 registry.verifyTimer(counterName) { timer ->
199 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
201 verifyCountersAndTimersAreUnchangedBut(
203 "$PREFIX.messages.sent.topic",
204 "$PREFIX.messages.sent",
205 "$PREFIX.messages.latency")
209 on("$PREFIX.messages.to.collector.travel.time") {
210 val counterName = "$PREFIX.messages.to.collector.travel.time"
211 val toCollectorTravelTimeMs = 100L
213 it("should update timer") {
214 val now = Instant.now()
215 val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
216 cut.notifyMessageReceived(vesMessage)
218 registry.verifyTimer(counterName) { timer ->
219 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
222 verifyCountersAndTimersAreUnchangedBut(counterName)
226 on("$PREFIX.messages.processing.time.without.routing") {
227 val counterName = "$PREFIX.messages.processing.time.without.routing"
228 val processingTimeMs = 100L
230 it("should update timer") {
232 cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
234 registry.verifyTimer(counterName) { timer ->
235 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
238 verifyCountersAndTimersAreUnchangedBut(counterName)
242 on("$PREFIX.messages.latency") {
243 val counterName = "$PREFIX.messages.latency"
244 val latencyMs = 1666L
246 it("should update timer") {
248 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
250 registry.verifyTimer(counterName) { timer ->
251 assertThat(timer.mean(TimeUnit.MILLISECONDS))
252 .isGreaterThanOrEqualTo(latencyMs.toDouble())
253 .isLessThanOrEqualTo(latencyMs + 10000.0)
256 verifyCountersAndTimersAreUnchangedBut(
258 "$PREFIX.messages.sent.topic",
259 "$PREFIX.messages.sent",
260 "$PREFIX.messages.processing.time")
265 describe("notifyMessageDropped") {
266 on("$PREFIX.messages.dropped counter") {
267 val counterName = "$PREFIX.messages.dropped"
269 it("should increment counter") {
270 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
271 cut.notifyMessageDropped(INVALID_MESSAGE)
273 registry.verifyCounter(counterName) {
274 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
276 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
280 on("$PREFIX.messages.dropped.cause counter") {
281 val counterName = "$PREFIX.messages.dropped.cause"
283 it("should handle counters for different drop reasons") {
284 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
285 cut.notifyMessageDropped(INVALID_MESSAGE)
286 cut.notifyMessageDropped(INVALID_MESSAGE)
288 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
289 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
292 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
293 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
299 describe("notifyClientConnected") {
300 on("$PREFIX.connections counter") {
301 val counterName = "$PREFIX.connections"
303 it("should increment counter") {
304 cut.notifyClientConnected()
305 cut.notifyClientConnected()
307 registry.verifyCounter(counterName) {
308 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
310 verifyCountersAndTimersAreUnchangedBut(counterName)
316 describe("notifyClientDisconnected") {
317 on("$PREFIX.disconnections counter") {
318 val counterName = "$PREFIX.disconnections"
320 it("should increment counter") {
321 cut.notifyClientDisconnected()
322 cut.notifyClientDisconnected()
324 registry.verifyCounter(counterName) {
325 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
327 verifyCountersAndTimersAreUnchangedBut(counterName)
333 describe("notifyClientRejected") {
335 on("$PREFIX.clients.rejected") {
336 val counterName = "$PREFIX.clients.rejected"
337 it("should increment counter for each possible reason") {
338 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
339 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
341 registry.verifyCounter(counterName) {
342 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
344 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
348 on("$PREFIX.clients.rejected.cause counter") {
349 val counterName = "$PREFIX.clients.rejected.cause"
350 it("should handle counters for different rejection reasons") {
351 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
352 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
353 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
355 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
356 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
359 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
360 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
366 describe("$PREFIX.connections.active gauge") {
367 val gaugeName = "$PREFIX.connections.active"
369 on("connection traffic") {
370 it("should calculate positive difference between connected and disconnected clients") {
371 cut.notifyClientConnected()
372 cut.notifyClientConnected()
373 cut.notifyClientConnected()
374 cut.notifyClientDisconnected()
376 registry.verifyGauge(gaugeName) {
377 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
381 it("should calculate no difference between connected and disconnected clients") {
382 cut.notifyClientDisconnected()
383 cut.notifyClientDisconnected()
385 registry.verifyGauge(gaugeName) {
386 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
390 it("should calculate negative difference between connected and disconnected clients") {
391 cut.notifyClientDisconnected()
393 registry.verifyGauge(gaugeName) {
394 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
401 private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
402 val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
403 val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
404 return VesMessage(commonHeader,
405 wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
408 private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
409 val commonHeader = commonHeader(domain)
410 return VesMessage(commonHeader,
411 wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
414 private fun routedMessage(topic: String, partition: Int = 0) =
415 vesEvent().run { toRoutedMessage(topic, partition) }
417 private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
418 vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
420 private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
422 val builder = toBuilder()
423 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
424 builder.build().toRoutedMessage(topic, partition)
427 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
429 receivedAt: Temporal = Instant.now()) =
431 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
433 Option.just(partition)