2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
22 import arrow.core.Option
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Gauge
26 import io.micrometer.core.instrument.Meter
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.core.instrument.search.RequiredSearch
29 import io.micrometer.prometheus.PrometheusConfig
30 import io.micrometer.prometheus.PrometheusMeterRegistry
31 import org.assertj.core.api.Assertions.assertThat
32 import org.assertj.core.data.Percentage
33 import org.jetbrains.spek.api.Spek
34 import org.jetbrains.spek.api.dsl.describe
35 import org.jetbrains.spek.api.dsl.it
36 import org.jetbrains.spek.api.dsl.on
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
38 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
40 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
42 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
43 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
44 import org.onap.dcae.collectors.veshv.domain.VesMessage
45 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
46 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
47 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
48 import org.onap.ves.VesEventOuterClass
49 import java.time.Instant
50 import java.time.temporal.Temporal
51 import java.util.concurrent.TimeUnit
52 import kotlin.reflect.KClass
55 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
58 object MicrometerMetricsTest : Spek({
59 val doublePrecision = Percentage.withPercentage(0.5)
60 lateinit var registry: PrometheusMeterRegistry
61 lateinit var cut: MicrometerMetrics
64 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
65 cut = MicrometerMetrics(registry)
68 fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
70 fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
74 { ex -> assertThat(ex).doesNotThrowAnyException() },
78 fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
79 verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
81 fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
82 verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
84 fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
85 verifyMeter(search, RequiredSearch::counter, verifier)
87 fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
88 verifyCounter(registrySearch(name), verifier)
91 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
92 fun <T : Meter> verifyAllMetersAreUnchangedBut(
94 changedCounters: Collection<String>,
95 valueOf: (T) -> Double) {
97 .filter { it.id.name.startsWith(PREFIX) }
98 .filter { clazz.isInstance(it) }
100 .filterNot { it.id.name in changedCounters }
102 assertThat(valueOf(it))
103 .describedAs(it.id.toString())
104 .isCloseTo(0.0, doublePrecision)
108 setOf(*changedMeters).let { changedMetersCollection ->
109 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
110 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
115 describe("notifyBytesReceived") {
116 on("$PREFIX.data.received.bytes counter") {
117 val counterName = "$PREFIX.data.received.bytes"
119 it("should increment counter") {
121 cut.notifyBytesReceived(bytes)
123 verifyCounter(counterName) {
124 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
128 it("should leave all other counters unchanged") {
129 cut.notifyBytesReceived(128)
130 verifyCountersAndTimersAreUnchangedBut(counterName)
135 describe("notifyMessageReceived") {
136 on("$PREFIX.messages.received counter") {
137 val counterName = "$PREFIX.messages.received"
139 it("should increment counter") {
140 cut.notifyMessageReceived(emptyWireProtocolFrame())
142 verifyCounter(counterName) {
143 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
148 on("$PREFIX.messages.received.payload.bytes counter") {
149 val counterName = "$PREFIX.messages.received.payload.bytes"
151 it("should increment counter") {
153 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
155 verifyCounter(counterName) {
156 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
161 it("should leave all other counters unchanged") {
162 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
163 verifyCountersAndTimersAreUnchangedBut(
164 "$PREFIX.messages.received",
165 "$PREFIX.messages.received.payload.bytes"
170 describe("notifyMessageSent") {
171 val topicName1 = "PERF3GPP"
172 val topicName2 = "CALLTRACE"
174 on("$PREFIX.messages.sent counter") {
175 val counterName = "$PREFIX.messages.sent"
177 it("should increment counter") {
178 cut.notifyMessageSent(routedMessage(topicName1))
180 verifyCounter(counterName) {
181 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
183 verifyCountersAndTimersAreUnchangedBut(
185 "$PREFIX.messages.sent.topic",
186 "$PREFIX.messages.processing.time",
187 "$PREFIX.messages.latency")
191 on("$PREFIX.messages.sent.topic counter") {
192 val counterName = "$PREFIX.messages.sent.topic"
194 it("should handle counters for different topics") {
195 cut.notifyMessageSent(routedMessage(topicName1))
196 cut.notifyMessageSent(routedMessage(topicName2))
197 cut.notifyMessageSent(routedMessage(topicName2))
199 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
200 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
203 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
204 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
209 on("$PREFIX.messages.processing.time") {
210 val counterName = "$PREFIX.messages.processing.time"
211 val processingTimeMs = 100L
213 it("should update timer") {
215 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
217 verifyTimer(counterName) { timer ->
218 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
220 verifyCountersAndTimersAreUnchangedBut(
222 "$PREFIX.messages.sent.topic",
223 "$PREFIX.messages.sent",
224 "$PREFIX.messages.latency")
228 on("$PREFIX.messages.latency") {
229 val counterName = "$PREFIX.messages.latency"
230 val latencyMs = 1666L
232 it("should update timer") {
234 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
236 verifyTimer(counterName) { timer ->
237 assertThat(timer.mean(TimeUnit.MILLISECONDS))
238 .isGreaterThanOrEqualTo(latencyMs.toDouble())
239 .isLessThanOrEqualTo(latencyMs + 10000.0)
242 verifyCountersAndTimersAreUnchangedBut(
244 "$PREFIX.messages.sent.topic",
245 "$PREFIX.messages.sent",
246 "$PREFIX.messages.processing.time")
251 describe("notifyMessageDropped") {
252 on("$PREFIX.messages.dropped counter") {
253 val counterName = "$PREFIX.messages.dropped"
255 it("should increment counter") {
256 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
257 cut.notifyMessageDropped(INVALID_MESSAGE)
259 verifyCounter(counterName) {
260 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
262 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
266 on("$PREFIX.messages.dropped.cause counter") {
267 val counterName = "$PREFIX.messages.dropped.cause"
269 it("should handle counters for different drop reasons") {
270 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
271 cut.notifyMessageDropped(INVALID_MESSAGE)
272 cut.notifyMessageDropped(INVALID_MESSAGE)
274 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
275 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
278 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
279 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
285 describe("notifyClientConnected") {
286 on("$PREFIX.connections counter") {
287 val counterName = "$PREFIX.connections"
289 it("should increment counter") {
290 cut.notifyClientConnected()
291 cut.notifyClientConnected()
293 verifyCounter(counterName) {
294 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
296 verifyCountersAndTimersAreUnchangedBut(counterName)
302 describe("notifyClientDisconnected") {
303 on("$PREFIX.disconnections counter") {
304 val counterName = "$PREFIX.disconnections"
306 it("should increment counter") {
307 cut.notifyClientDisconnected()
308 cut.notifyClientDisconnected()
310 verifyCounter(counterName) {
311 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
313 verifyCountersAndTimersAreUnchangedBut(counterName)
319 describe("notifyClientRejected") {
321 on("$PREFIX.clients.rejected") {
322 val counterName = "$PREFIX.clients.rejected"
323 it("should increment counter for each possible reason") {
324 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
325 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
327 verifyCounter(counterName) {
328 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
330 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
334 on("$PREFIX.clients.rejected.cause counter") {
335 val counterName = "$PREFIX.clients.rejected.cause"
336 it("should handle counters for different rejection reasons") {
337 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
338 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
339 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
341 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
342 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
345 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
346 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
352 describe("$PREFIX.connections.active gauge") {
353 val gaugeName = "$PREFIX.connections.active"
355 on("connection traffic") {
356 it("should calculate positive difference between connected and disconnected clients") {
357 cut.notifyClientConnected()
358 cut.notifyClientConnected()
359 cut.notifyClientConnected()
360 cut.notifyClientDisconnected()
362 verifyGauge(gaugeName) {
363 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
367 it("should calculate no difference between connected and disconnected clients") {
368 cut.notifyClientDisconnected()
369 cut.notifyClientDisconnected()
371 verifyGauge(gaugeName) {
372 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
376 it("should calculate negative difference between connected and disconnected clients") {
377 cut.notifyClientDisconnected()
379 verifyGauge(gaugeName) {
380 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
387 fun routedMessage(topic: String, partition: Int = 0) =
388 vesEvent().run { toRoutedMessage(topic, partition) }
390 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
391 vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
393 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
395 val builder = toBuilder()
396 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
397 builder.build().toRoutedMessage(topic, partition)
400 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
402 receivedAt: Temporal = Instant.now()) =
404 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
406 Option.just(partition)