Add metric for total latency without routing
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-main / src / test / kotlin / org / onap / dcae / collectors / veshv / main / MicrometerMetricsTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2020 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Option
23 import com.google.protobuf.ByteString
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Tags
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
37 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
38 import org.onap.dcae.collectors.veshv.domain.VesMessage
39 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
40 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
41 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
42 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
43 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
44 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
45 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
46 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
48 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
49 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
50 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
51 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
52 import org.onap.ves.VesEventOuterClass
53 import java.time.Instant
54 import java.time.temporal.Temporal
55 import java.util.concurrent.TimeUnit
56 import kotlin.reflect.KClass
57
58 /**
59  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
60  * @since June 2018
61  */
62 object MicrometerMetricsTest : Spek({
63     val doublePrecision = Percentage.withPercentage(0.5)
64     lateinit var registry: PrometheusMeterRegistry
65     lateinit var cut: MicrometerMetrics
66
67     beforeEachTest {
68         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
69         cut = MicrometerMetrics(registry)
70     }
71
72     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
73         fun <T : Meter> verifyAllMetersAreUnchangedBut(
74                 clazz: KClass<T>,
75                 changedCounters: Collection<String>,
76                 valueOf: (T) -> Double) {
77             registry.meters
78                     .filter { it.id.name.startsWith(PREFIX) }
79                     .filter { clazz.isInstance(it) }
80                     .map { it as T }
81                     .filterNot { it.id.name in changedCounters }
82                     .forEach {
83                         assertThat(valueOf(it))
84                                 .describedAs(it.id.toString())
85                                 .isCloseTo(0.0, doublePrecision)
86                     }
87         }
88
89         setOf(*changedMeters).let { changedMetersCollection ->
90             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
91             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
92         }
93     }
94
95
96     describe("notifyBytesReceived") {
97         on("$PREFIX.data.received.bytes counter") {
98             val counterName = "$PREFIX.data.received.bytes"
99
100             it("should increment counter") {
101                 val bytes = 128
102                 cut.notifyBytesReceived(bytes)
103
104                 registry.verifyCounter(counterName) {
105                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
106                 }
107             }
108
109             it("should leave all other counters unchanged") {
110                 cut.notifyBytesReceived(128)
111                 verifyCountersAndTimersAreUnchangedBut(counterName)
112             }
113         }
114     }
115
116     describe("notifyMessageReceived") {
117         on("$PREFIX.messages.received counter") {
118             val counterName = "$PREFIX.messages.received"
119
120             it("should increment counter") {
121                 cut.notifyMessageReceived(emptyWireProtocolFrame())
122
123                 registry.verifyCounter(counterName) {
124                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
125                 }
126             }
127         }
128
129         on("$PREFIX.messages.received.payload.bytes counter") {
130             val counterName = "$PREFIX.messages.received.payload.bytes"
131
132             it("should increment counter") {
133                 val bytes = 888
134                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
135
136                 registry.verifyCounter(counterName) {
137                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
138                 }
139             }
140         }
141
142         it("should leave all other counters unchanged") {
143             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
144             verifyCountersAndTimersAreUnchangedBut(
145                     "$PREFIX.messages.received",
146                     "$PREFIX.messages.received.payload.bytes"
147             )
148         }
149
150         on("$PREFIX.messages.to.collector.travel.time") {
151             val counterName = "$PREFIX.messages.to.collector.travel.time"
152             val toCollectorTravelTimeMs = 100L
153
154             it("should update timer") {
155                 val now = Instant.now()
156                 val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
157                 cut.notifyMessageReceived(vesMessage)
158
159                 registry.verifyTimer(counterName) { timer ->
160                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
161                 }
162
163                 verifyCountersAndTimersAreUnchangedBut(counterName)
164             }
165         }
166     }
167
168     describe("notifyMessageReadyForRouting"){
169         on("$PREFIX.messages.processing.time.without.routing") {
170             val counterName = "$PREFIX.messages.processing.time.without.routing"
171             val processingTimeMs = 100L
172
173             it("should update timer") {
174
175                 cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
176
177                 registry.verifyTimer(counterName) { timer ->
178                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
179                 }
180                 verifyCountersAndTimersAreUnchangedBut(
181                         counterName,
182                         "$PREFIX.messages.latency.without.routing"
183                 )
184             }
185         }
186
187         on("$PREFIX.messages.latency.without.routing") {
188             val counterName = "$PREFIX.messages.latency.without.routing"
189             val latencyWithoutRoutingMs = 200L
190
191             it("should update timer") {
192
193                 val sentAt = Instant.now().minusMillis(latencyWithoutRoutingMs)
194
195                 cut.notifyMessageReadyForRouting(vesMessageSentAt(sentAt))
196
197                 registry.verifyTimer(counterName) { timer ->
198                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(latencyWithoutRoutingMs.toDouble())
199                 }
200                 verifyCountersAndTimersAreUnchangedBut(
201                         counterName,
202                         "$PREFIX.messages.processing.time.without.routing"
203                 )
204             }
205         }
206     }
207
208
209     describe("notifyMessageSent") {
210         val topicName1 = "PERF3GPP"
211         val topicName2 = "CALLTRACE"
212
213         on("$PREFIX.messages.sent counter") {
214             val counterName = "$PREFIX.messages.sent"
215
216             it("should increment counter") {
217                 cut.notifyMessageSent(routedMessage(topicName1))
218
219                 registry.verifyCounter(counterName) {
220                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
221                 }
222                 verifyCountersAndTimersAreUnchangedBut(
223                         counterName,
224                         "$PREFIX.messages.sent.topic",
225                         "$PREFIX.messages.processing.time",
226                         "$PREFIX.messages.latency")
227             }
228         }
229
230         on("$PREFIX.messages.sent.topic counter") {
231             val counterName = "$PREFIX.messages.sent.topic"
232
233             it("should handle counters for different topics") {
234                 cut.notifyMessageSent(routedMessage(topicName1))
235                 cut.notifyMessageSent(routedMessage(topicName2))
236                 cut.notifyMessageSent(routedMessage(topicName2))
237
238                 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
239                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
240                 }
241
242                 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
243                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
244                 }
245             }
246         }
247
248         on("$PREFIX.messages.processing.time") {
249             val counterName = "$PREFIX.messages.processing.time"
250             val processingTimeMs = 100L
251
252             it("should update timer") {
253
254                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
255
256                 registry.verifyTimer(counterName) { timer ->
257                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
258                 }
259                 verifyCountersAndTimersAreUnchangedBut(
260                         counterName,
261                         "$PREFIX.messages.sent.topic",
262                         "$PREFIX.messages.sent",
263                         "$PREFIX.messages.latency")
264             }
265         }
266
267         on("$PREFIX.messages.latency") {
268             val counterName = "$PREFIX.messages.latency"
269             val latencyMs = 1666L
270
271             it("should update timer") {
272
273                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
274
275                 registry.verifyTimer(counterName) { timer ->
276                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
277                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
278                             .isLessThanOrEqualTo(latencyMs + 10000.0)
279
280                 }
281                 verifyCountersAndTimersAreUnchangedBut(
282                         counterName,
283                         "$PREFIX.messages.sent.topic",
284                         "$PREFIX.messages.sent",
285                         "$PREFIX.messages.processing.time")
286             }
287         }
288     }
289
290     describe("notifyMessageDropped") {
291         on("$PREFIX.messages.dropped counter") {
292             val counterName = "$PREFIX.messages.dropped"
293
294             it("should increment counter") {
295                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
296                 cut.notifyMessageDropped(INVALID_MESSAGE)
297
298                 registry.verifyCounter(counterName) {
299                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
300                 }
301                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
302             }
303         }
304
305         on("$PREFIX.messages.dropped.cause counter") {
306             val counterName = "$PREFIX.messages.dropped.cause"
307
308             it("should handle counters for different drop reasons") {
309                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
310                 cut.notifyMessageDropped(INVALID_MESSAGE)
311                 cut.notifyMessageDropped(INVALID_MESSAGE)
312
313                 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
314                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
315                 }
316
317                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
318                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
319                 }
320             }
321         }
322     }
323
324     describe("notifyClientConnected") {
325         on("$PREFIX.connections counter") {
326             val counterName = "$PREFIX.connections"
327
328             it("should increment counter") {
329                 cut.notifyClientConnected()
330                 cut.notifyClientConnected()
331
332                 registry.verifyCounter(counterName) {
333                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
334                 }
335                 verifyCountersAndTimersAreUnchangedBut(counterName)
336             }
337         }
338
339     }
340
341     describe("notifyClientDisconnected") {
342         on("$PREFIX.disconnections counter") {
343             val counterName = "$PREFIX.disconnections"
344
345             it("should increment counter") {
346                 cut.notifyClientDisconnected()
347                 cut.notifyClientDisconnected()
348
349                 registry.verifyCounter(counterName) {
350                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
351                 }
352                 verifyCountersAndTimersAreUnchangedBut(counterName)
353             }
354         }
355
356     }
357
358     describe("notifyClientRejected") {
359
360         on("$PREFIX.clients.rejected") {
361             val counterName = "$PREFIX.clients.rejected"
362             it("should increment counter for each possible reason") {
363                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
364                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
365
366                 registry.verifyCounter(counterName) {
367                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
368                 }
369                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
370             }
371         }
372
373         on("$PREFIX.clients.rejected.cause counter") {
374             val counterName = "$PREFIX.clients.rejected.cause"
375             it("should handle counters for different rejection reasons") {
376                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
377                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
378                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
379
380                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
381                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
382                 }
383
384                 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
385                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
386                 }
387             }
388         }
389     }
390
391     describe("$PREFIX.connections.active gauge") {
392         val gaugeName = "$PREFIX.connections.active"
393
394         on("connection traffic") {
395             it("should calculate positive difference between connected and disconnected clients") {
396                 cut.notifyClientConnected()
397                 cut.notifyClientConnected()
398                 cut.notifyClientConnected()
399                 cut.notifyClientDisconnected()
400
401                 registry.verifyGauge(gaugeName) {
402                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
403                 }
404             }
405
406             it("should calculate no difference between connected and disconnected clients") {
407                 cut.notifyClientDisconnected()
408                 cut.notifyClientDisconnected()
409
410                 registry.verifyGauge(gaugeName) {
411                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
412                 }
413             }
414
415             it("should calculate negative difference between connected and disconnected clients") {
416                 cut.notifyClientDisconnected()
417
418                 registry.verifyGauge(gaugeName) {
419                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
420                 }
421             }
422         }
423     }
424 })
425
426 private fun vesMessageSentAt(sentAt: Instant): VesMessage {
427     val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
428     val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
429     return VesMessage(commonHeader,
430             wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements")))
431 }
432
433 private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
434     val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
435     val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
436     return VesMessage(commonHeader,
437             wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
438 }
439
440 private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
441     val commonHeader = commonHeader(domain)
442     return VesMessage(commonHeader,
443             wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
444 }
445
446 private fun routedMessage(topic: String, partition: Int = 0) =
447         vesEvent().run { toRoutedMessage(topic, partition) }
448
449 private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
450         vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
451
452 private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
453         vesEvent().run {
454             val builder = toBuilder()
455             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
456             builder.build().toRoutedMessage(topic, partition)
457         }
458
459 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
460                                                         partition: Int,
461                                                         receivedAt: Temporal = Instant.now()) =
462         RoutedMessage(
463                 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
464                 topic,
465                 Option.just(partition)
466         )
467