efd353eccb145710498b8b73ce6bd65ef6ff2a68
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-main / src / test / kotlin / org / onap / dcae / collectors / veshv / main / MicrometerMetricsTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2020 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Option
23 import com.google.protobuf.ByteString
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Tags
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
37 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
38 import org.onap.dcae.collectors.veshv.domain.VesMessage
39 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
40 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
41 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
42 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
43 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
44 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
45 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
46 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
48 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
49 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
50 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
51 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
52 import org.onap.ves.VesEventOuterClass
53 import java.time.Instant
54 import java.time.temporal.Temporal
55 import java.util.concurrent.TimeUnit
56 import kotlin.reflect.KClass
57
58 /**
59  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
60  * @since June 2018
61  */
62 object MicrometerMetricsTest : Spek({
63     val doublePrecision = Percentage.withPercentage(0.5)
64     lateinit var registry: PrometheusMeterRegistry
65     lateinit var cut: MicrometerMetrics
66
67     beforeEachTest {
68         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
69         cut = MicrometerMetrics(registry)
70     }
71
72     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
73         fun <T : Meter> verifyAllMetersAreUnchangedBut(
74                 clazz: KClass<T>,
75                 changedCounters: Collection<String>,
76                 valueOf: (T) -> Double) {
77             registry.meters
78                     .filter { it.id.name.startsWith(PREFIX) }
79                     .filter { clazz.isInstance(it) }
80                     .map { it as T }
81                     .filterNot { it.id.name in changedCounters }
82                     .forEach {
83                         assertThat(valueOf(it))
84                                 .describedAs(it.id.toString())
85                                 .isCloseTo(0.0, doublePrecision)
86                     }
87         }
88
89         setOf(*changedMeters).let { changedMetersCollection ->
90             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
91             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
92         }
93     }
94
95
96     describe("notifyBytesReceived") {
97         on("$PREFIX.data.received.bytes counter") {
98             val counterName = "$PREFIX.data.received.bytes"
99
100             it("should increment counter") {
101                 val bytes = 128
102                 cut.notifyBytesReceived(bytes)
103
104                 registry.verifyCounter(counterName) {
105                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
106                 }
107             }
108
109             it("should leave all other counters unchanged") {
110                 cut.notifyBytesReceived(128)
111                 verifyCountersAndTimersAreUnchangedBut(counterName)
112             }
113         }
114     }
115
116     describe("notifyMessageReceived") {
117         on("$PREFIX.messages.received counter") {
118             val counterName = "$PREFIX.messages.received"
119
120             it("should increment counter") {
121                 cut.notifyMessageReceived(emptyWireProtocolFrame())
122
123                 registry.verifyCounter(counterName) {
124                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
125                 }
126             }
127         }
128
129         on("$PREFIX.messages.received.payload.bytes counter") {
130             val counterName = "$PREFIX.messages.received.payload.bytes"
131
132             it("should increment counter") {
133                 val bytes = 888
134                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
135
136                 registry.verifyCounter(counterName) {
137                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
138                 }
139             }
140         }
141
142         it("should leave all other counters unchanged") {
143             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
144             verifyCountersAndTimersAreUnchangedBut(
145                     "$PREFIX.messages.received",
146                     "$PREFIX.messages.received.payload.bytes"
147             )
148         }
149     }
150
151     describe("notifyMessageSent") {
152         val topicName1 = "PERF3GPP"
153         val topicName2 = "CALLTRACE"
154
155         on("$PREFIX.messages.sent counter") {
156             val counterName = "$PREFIX.messages.sent"
157
158             it("should increment counter") {
159                 cut.notifyMessageSent(routedMessage(topicName1))
160
161                 registry.verifyCounter(counterName) {
162                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
163                 }
164                 verifyCountersAndTimersAreUnchangedBut(
165                         counterName,
166                         "$PREFIX.messages.sent.topic",
167                         "$PREFIX.messages.processing.time",
168                         "$PREFIX.messages.latency")
169             }
170         }
171
172         on("$PREFIX.messages.sent.topic counter") {
173             val counterName = "$PREFIX.messages.sent.topic"
174
175             it("should handle counters for different topics") {
176                 cut.notifyMessageSent(routedMessage(topicName1))
177                 cut.notifyMessageSent(routedMessage(topicName2))
178                 cut.notifyMessageSent(routedMessage(topicName2))
179
180                 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
181                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
182                 }
183
184                 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
185                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
186                 }
187             }
188         }
189
190         on("$PREFIX.messages.processing.time") {
191             val counterName = "$PREFIX.messages.processing.time"
192             val processingTimeMs = 100L
193
194             it("should update timer") {
195
196                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
197
198                 registry.verifyTimer(counterName) { timer ->
199                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
200                 }
201                 verifyCountersAndTimersAreUnchangedBut(
202                         counterName,
203                         "$PREFIX.messages.sent.topic",
204                         "$PREFIX.messages.sent",
205                         "$PREFIX.messages.latency")
206             }
207         }
208
209         on("$PREFIX.messages.to.collector.travel.time") {
210             val counterName = "$PREFIX.messages.to.collector.travel.time"
211             val toCollectorTravelTimeMs = 100L
212
213             it("should update timer") {
214                 val now = Instant.now()
215                 val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
216                 cut.notifyMessageReceived(vesMessage)
217
218                 registry.verifyTimer(counterName) { timer ->
219                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
220                 }
221
222                 verifyCountersAndTimersAreUnchangedBut(counterName)
223             }
224         }
225
226         on("$PREFIX.messages.processing.time.without.routing") {
227             val counterName = "$PREFIX.messages.processing.time.without.routing"
228             val processingTimeMs = 100L
229
230             it("should update timer") {
231
232                 cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
233
234                 registry.verifyTimer(counterName) { timer ->
235                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
236                 }
237
238                 verifyCountersAndTimersAreUnchangedBut(counterName)
239             }
240         }
241
242         on("$PREFIX.messages.latency") {
243             val counterName = "$PREFIX.messages.latency"
244             val latencyMs = 1666L
245
246             it("should update timer") {
247
248                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
249
250                 registry.verifyTimer(counterName) { timer ->
251                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
252                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
253                             .isLessThanOrEqualTo(latencyMs + 10000.0)
254
255                 }
256                 verifyCountersAndTimersAreUnchangedBut(
257                         counterName,
258                         "$PREFIX.messages.sent.topic",
259                         "$PREFIX.messages.sent",
260                         "$PREFIX.messages.processing.time")
261             }
262         }
263     }
264
265     describe("notifyMessageDropped") {
266         on("$PREFIX.messages.dropped counter") {
267             val counterName = "$PREFIX.messages.dropped"
268
269             it("should increment counter") {
270                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
271                 cut.notifyMessageDropped(INVALID_MESSAGE)
272
273                 registry.verifyCounter(counterName) {
274                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
275                 }
276                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
277             }
278         }
279
280         on("$PREFIX.messages.dropped.cause counter") {
281             val counterName = "$PREFIX.messages.dropped.cause"
282
283             it("should handle counters for different drop reasons") {
284                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
285                 cut.notifyMessageDropped(INVALID_MESSAGE)
286                 cut.notifyMessageDropped(INVALID_MESSAGE)
287
288                 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
289                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
290                 }
291
292                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
293                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
294                 }
295             }
296         }
297     }
298
299     describe("notifyClientConnected") {
300         on("$PREFIX.connections counter") {
301             val counterName = "$PREFIX.connections"
302
303             it("should increment counter") {
304                 cut.notifyClientConnected()
305                 cut.notifyClientConnected()
306
307                 registry.verifyCounter(counterName) {
308                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
309                 }
310                 verifyCountersAndTimersAreUnchangedBut(counterName)
311             }
312         }
313
314     }
315
316     describe("notifyClientDisconnected") {
317         on("$PREFIX.disconnections counter") {
318             val counterName = "$PREFIX.disconnections"
319
320             it("should increment counter") {
321                 cut.notifyClientDisconnected()
322                 cut.notifyClientDisconnected()
323
324                 registry.verifyCounter(counterName) {
325                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
326                 }
327                 verifyCountersAndTimersAreUnchangedBut(counterName)
328             }
329         }
330
331     }
332
333     describe("notifyClientRejected") {
334
335         on("$PREFIX.clients.rejected") {
336             val counterName = "$PREFIX.clients.rejected"
337             it("should increment counter for each possible reason") {
338                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
339                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
340
341                 registry.verifyCounter(counterName) {
342                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
343                 }
344                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
345             }
346         }
347
348         on("$PREFIX.clients.rejected.cause counter") {
349             val counterName = "$PREFIX.clients.rejected.cause"
350             it("should handle counters for different rejection reasons") {
351                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
352                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
353                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
354
355                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
356                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
357                 }
358
359                 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
360                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
361                 }
362             }
363         }
364     }
365
366     describe("$PREFIX.connections.active gauge") {
367         val gaugeName = "$PREFIX.connections.active"
368
369         on("connection traffic") {
370             it("should calculate positive difference between connected and disconnected clients") {
371                 cut.notifyClientConnected()
372                 cut.notifyClientConnected()
373                 cut.notifyClientConnected()
374                 cut.notifyClientDisconnected()
375
376                 registry.verifyGauge(gaugeName) {
377                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
378                 }
379             }
380
381             it("should calculate no difference between connected and disconnected clients") {
382                 cut.notifyClientDisconnected()
383                 cut.notifyClientDisconnected()
384
385                 registry.verifyGauge(gaugeName) {
386                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
387                 }
388             }
389
390             it("should calculate negative difference between connected and disconnected clients") {
391                 cut.notifyClientDisconnected()
392
393                 registry.verifyGauge(gaugeName) {
394                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
395                 }
396             }
397         }
398     }
399 })
400
401 private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
402     val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
403     val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
404     return VesMessage(commonHeader,
405             wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
406 }
407
408 private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
409     val commonHeader = commonHeader(domain)
410     return VesMessage(commonHeader,
411             wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
412 }
413
414 private fun routedMessage(topic: String, partition: Int = 0) =
415         vesEvent().run { toRoutedMessage(topic, partition) }
416
417 private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
418         vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
419
420 private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
421         vesEvent().run {
422             val builder = toBuilder()
423             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
424             builder.build().toRoutedMessage(topic, partition)
425         }
426
427 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
428                                                         partition: Int,
429                                                         receivedAt: Temporal = Instant.now()) =
430         RoutedMessage(
431                 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
432                 topic,
433                 Option.just(partition)
434         )
435