2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
22 import arrow.core.Option
23 import com.google.protobuf.ByteString
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Tags
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
42 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
43 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
44 import org.onap.dcae.collectors.veshv.domain.VesMessage
45 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
46 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
48 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
49 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
50 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
51 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
52 import org.onap.ves.VesEventOuterClass
53 import java.time.Instant
54 import java.time.temporal.Temporal
55 import java.util.concurrent.TimeUnit
56 import kotlin.reflect.KClass
59 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
62 object MicrometerMetricsTest : Spek({
63 val doublePrecision = Percentage.withPercentage(0.5)
64 lateinit var registry: PrometheusMeterRegistry
65 lateinit var cut: MicrometerMetrics
68 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
69 cut = MicrometerMetrics(registry)
72 fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
73 fun <T : Meter> verifyAllMetersAreUnchangedBut(
75 changedCounters: Collection<String>,
76 valueOf: (T) -> Double) {
78 .filter { it.id.name.startsWith(PREFIX) }
79 .filter { clazz.isInstance(it) }
81 .filterNot { it.id.name in changedCounters }
83 assertThat(valueOf(it))
84 .describedAs(it.id.toString())
85 .isCloseTo(0.0, doublePrecision)
89 setOf(*changedMeters).let { changedMetersCollection ->
90 verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
91 verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
96 describe("notifyBytesReceived") {
97 on("$PREFIX.data.received.bytes counter") {
98 val counterName = "$PREFIX.data.received.bytes"
100 it("should increment counter") {
102 cut.notifyBytesReceived(bytes)
104 registry.verifyCounter(counterName) {
105 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
109 it("should leave all other counters unchanged") {
110 cut.notifyBytesReceived(128)
111 verifyCountersAndTimersAreUnchangedBut(counterName)
116 describe("notifyMessageReceived") {
117 on("$PREFIX.messages.received counter") {
118 val counterName = "$PREFIX.messages.received"
120 it("should increment counter") {
121 cut.notifyMessageReceived(emptyWireProtocolFrame())
123 registry.verifyCounter(counterName) {
124 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
129 on("$PREFIX.messages.received.payload.bytes counter") {
130 val counterName = "$PREFIX.messages.received.payload.bytes"
132 it("should increment counter") {
134 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
136 registry.verifyCounter(counterName) {
137 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
142 it("should leave all other counters unchanged") {
143 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
144 verifyCountersAndTimersAreUnchangedBut(
145 "$PREFIX.messages.received",
146 "$PREFIX.messages.received.payload.bytes"
151 describe("notifyMessageSent") {
152 val topicName1 = "PERF3GPP"
153 val topicName2 = "CALLTRACE"
155 on("$PREFIX.messages.sent counter") {
156 val counterName = "$PREFIX.messages.sent"
158 it("should increment counter") {
159 cut.notifyMessageSent(routedMessage(topicName1))
161 registry.verifyCounter(counterName) {
162 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
164 verifyCountersAndTimersAreUnchangedBut(
166 "$PREFIX.messages.sent.topic",
167 "$PREFIX.messages.processing.time",
168 "$PREFIX.messages.latency")
172 on("$PREFIX.messages.sent.topic counter") {
173 val counterName = "$PREFIX.messages.sent.topic"
175 it("should handle counters for different topics") {
176 cut.notifyMessageSent(routedMessage(topicName1))
177 cut.notifyMessageSent(routedMessage(topicName2))
178 cut.notifyMessageSent(routedMessage(topicName2))
180 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
181 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
184 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
185 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
190 on("$PREFIX.messages.processing.time") {
191 val counterName = "$PREFIX.messages.processing.time"
192 val processingTimeMs = 100L
194 it("should update timer") {
196 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
198 registry.verifyTimer(counterName) { timer ->
199 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
201 verifyCountersAndTimersAreUnchangedBut(
203 "$PREFIX.messages.sent.topic",
204 "$PREFIX.messages.sent",
205 "$PREFIX.messages.latency")
209 on("$PREFIX.messages.processing.time.without.routing") {
210 val counterName = "$PREFIX.messages.processing.time.without.routing"
211 val processingTimeMs = 100L
213 it("should update timer") {
215 cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
217 registry.verifyTimer(counterName) { timer ->
218 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
220 verifyCountersAndTimersAreUnchangedBut(
222 "$PREFIX.messages.sent.topic",
223 "$PREFIX.messages.sent",
224 "$PREFIX.messages.latency")
228 on("$PREFIX.messages.latency") {
229 val counterName = "$PREFIX.messages.latency"
230 val latencyMs = 1666L
232 it("should update timer") {
234 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
236 registry.verifyTimer(counterName) { timer ->
237 assertThat(timer.mean(TimeUnit.MILLISECONDS))
238 .isGreaterThanOrEqualTo(latencyMs.toDouble())
239 .isLessThanOrEqualTo(latencyMs + 10000.0)
242 verifyCountersAndTimersAreUnchangedBut(
244 "$PREFIX.messages.sent.topic",
245 "$PREFIX.messages.sent",
246 "$PREFIX.messages.processing.time")
251 describe("notifyMessageDropped") {
252 on("$PREFIX.messages.dropped counter") {
253 val counterName = "$PREFIX.messages.dropped"
255 it("should increment counter") {
256 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
257 cut.notifyMessageDropped(INVALID_MESSAGE)
259 registry.verifyCounter(counterName) {
260 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
262 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
266 on("$PREFIX.messages.dropped.cause counter") {
267 val counterName = "$PREFIX.messages.dropped.cause"
269 it("should handle counters for different drop reasons") {
270 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
271 cut.notifyMessageDropped(INVALID_MESSAGE)
272 cut.notifyMessageDropped(INVALID_MESSAGE)
274 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
275 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
278 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
279 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
285 describe("notifyClientConnected") {
286 on("$PREFIX.connections counter") {
287 val counterName = "$PREFIX.connections"
289 it("should increment counter") {
290 cut.notifyClientConnected()
291 cut.notifyClientConnected()
293 registry.verifyCounter(counterName) {
294 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
296 verifyCountersAndTimersAreUnchangedBut(counterName)
302 describe("notifyClientDisconnected") {
303 on("$PREFIX.disconnections counter") {
304 val counterName = "$PREFIX.disconnections"
306 it("should increment counter") {
307 cut.notifyClientDisconnected()
308 cut.notifyClientDisconnected()
310 registry.verifyCounter(counterName) {
311 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
313 verifyCountersAndTimersAreUnchangedBut(counterName)
319 describe("notifyClientRejected") {
321 on("$PREFIX.clients.rejected") {
322 val counterName = "$PREFIX.clients.rejected"
323 it("should increment counter for each possible reason") {
324 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
325 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
327 registry.verifyCounter(counterName) {
328 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
330 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
334 on("$PREFIX.clients.rejected.cause counter") {
335 val counterName = "$PREFIX.clients.rejected.cause"
336 it("should handle counters for different rejection reasons") {
337 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
338 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
339 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
341 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
342 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
345 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
346 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
352 describe("$PREFIX.connections.active gauge") {
353 val gaugeName = "$PREFIX.connections.active"
355 on("connection traffic") {
356 it("should calculate positive difference between connected and disconnected clients") {
357 cut.notifyClientConnected()
358 cut.notifyClientConnected()
359 cut.notifyClientConnected()
360 cut.notifyClientDisconnected()
362 registry.verifyGauge(gaugeName) {
363 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
367 it("should calculate no difference between connected and disconnected clients") {
368 cut.notifyClientDisconnected()
369 cut.notifyClientDisconnected()
371 registry.verifyGauge(gaugeName) {
372 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
376 it("should calculate negative difference between connected and disconnected clients") {
377 cut.notifyClientDisconnected()
379 registry.verifyGauge(gaugeName) {
380 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
387 private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
388 val commonHeader = commonHeader(domain)
389 return VesMessage(commonHeader,
390 wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
393 private fun routedMessage(topic: String, partition: Int = 0) =
394 vesEvent().run { toRoutedMessage(topic, partition) }
396 private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
397 vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
399 private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
401 val builder = toBuilder()
402 builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
403 builder.build().toRoutedMessage(topic, partition)
406 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
408 receivedAt: Temporal = Instant.now()) =
410 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
412 Option.just(partition)