e452a5f4746400df6c71616adb65dcb78c23eb2e
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Try
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Timer
27 import io.micrometer.core.instrument.search.RequiredSearch
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
42 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
43 import org.onap.dcae.collectors.veshv.domain.VesMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
45 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
47 import java.time.Instant
48 import java.time.temporal.Temporal
49 import java.util.concurrent.TimeUnit
50 import kotlin.reflect.KClass
51
52 /**
53  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
54  * @since June 2018
55  */
56 object MicrometerMetricsTest : Spek({
57     val doublePrecision = Percentage.withPercentage(0.5)
58     lateinit var registry: PrometheusMeterRegistry
59     lateinit var cut: MicrometerMetrics
60
61     beforeEachTest {
62         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
63         cut = MicrometerMetrics(registry)
64     }
65
66     fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
67
68     fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
69             Try {
70                 map(search)
71             }.fold(
72                     { ex -> assertThat(ex).doesNotThrowAnyException() },
73                     verifier
74             )
75
76     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
77             verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
78
79     fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
80             verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
81
82     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
83             verifyMeter(search, RequiredSearch::counter, verifier)
84
85     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
86             verifyCounter(registrySearch(name), verifier)
87
88
89     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
90         fun <T : Meter> verifyAllMetersAreUnchangedBut(
91                 clazz: KClass<T>,
92                 changedCounters: Collection<String>,
93                 valueOf: (T) -> Double) {
94             registry.meters
95                     .filter { it.id.name.startsWith(PREFIX) }
96                     .filter { clazz.isInstance(it) }
97                     .map { it as T }
98                     .filterNot { it.id.name in changedCounters }
99                     .forEach {
100                         assertThat(valueOf(it))
101                                 .describedAs(it.id.toString())
102                                 .isCloseTo(0.0, doublePrecision)
103                     }
104         }
105
106         setOf(*changedMeters).let { changedMetersCollection ->
107             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
108             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
109         }
110     }
111
112
113     describe("notifyBytesReceived") {
114         on("$PREFIX.data.received.bytes counter") {
115             val counterName = "$PREFIX.data.received.bytes"
116
117             it("should increment counter") {
118                 val bytes = 128
119                 cut.notifyBytesReceived(bytes)
120
121                 verifyCounter(counterName) {
122                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
123                 }
124             }
125
126             it("should leave all other counters unchanged") {
127                 cut.notifyBytesReceived(128)
128                 verifyCountersAndTimersAreUnchangedBut(counterName)
129             }
130         }
131     }
132
133     describe("notifyMessageReceived") {
134         on("$PREFIX.messages.received counter") {
135             val counterName = "$PREFIX.messages.received"
136
137             it("should increment counter") {
138                 cut.notifyMessageReceived(emptyWireProtocolFrame())
139
140                 verifyCounter(counterName) {
141                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
142                 }
143             }
144         }
145
146         on("$PREFIX.messages.received.payload.bytes counter") {
147             val counterName = "$PREFIX.messages.received.payload.bytes"
148
149             it("should increment counter") {
150                 val bytes = 888
151                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
152
153                 verifyCounter(counterName) {
154                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
155                 }
156             }
157         }
158
159         it("should leave all other counters unchanged") {
160             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
161             verifyCountersAndTimersAreUnchangedBut(
162                     "$PREFIX.messages.received",
163                     "$PREFIX.messages.received.payload.bytes"
164             )
165         }
166     }
167
168     describe("notifyMessageSent") {
169         val topicName1 = "PERF3GPP"
170         val topicName2 = "CALLTRACE"
171
172         on("$PREFIX.messages.sent counter") {
173             val counterName = "$PREFIX.messages.sent"
174
175             it("should increment counter") {
176                 cut.notifyMessageSent(routedMessage(topicName1))
177
178                 verifyCounter(counterName) {
179                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
180                 }
181                 verifyCountersAndTimersAreUnchangedBut(
182                         counterName,
183                         "$PREFIX.messages.sent.topic",
184                         "$PREFIX.messages.processing.time",
185                         "$PREFIX.messages.latency")
186             }
187         }
188
189         on("$PREFIX.messages.sent.topic counter") {
190             val counterName = "$PREFIX.messages.sent.topic"
191
192             it("should handle counters for different topics") {
193                 cut.notifyMessageSent(routedMessage(topicName1))
194                 cut.notifyMessageSent(routedMessage(topicName2))
195                 cut.notifyMessageSent(routedMessage(topicName2))
196
197                 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
198                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
199                 }
200
201                 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
202                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
203                 }
204             }
205         }
206
207         on("$PREFIX.messages.processing.time") {
208             val counterName = "$PREFIX.messages.processing.time"
209             val processingTimeMs = 100L
210
211             it("should update timer") {
212
213                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
214
215                 verifyTimer(counterName) { timer ->
216                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
217                 }
218                 verifyCountersAndTimersAreUnchangedBut(
219                         counterName,
220                         "$PREFIX.messages.sent.topic",
221                         "$PREFIX.messages.sent",
222                         "$PREFIX.messages.latency")
223             }
224         }
225
226         on("$PREFIX.messages.latency") {
227             val counterName = "$PREFIX.messages.latency"
228             val latencyMs = 1666L
229
230             it("should update timer") {
231
232                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
233
234                 verifyTimer(counterName) { timer ->
235                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
236                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
237                             .isLessThanOrEqualTo(latencyMs + 10000.0)
238
239                 }
240                 verifyCountersAndTimersAreUnchangedBut(
241                         counterName,
242                         "$PREFIX.messages.sent.topic",
243                         "$PREFIX.messages.sent",
244                         "$PREFIX.messages.processing.time")
245             }
246         }
247     }
248
249     describe("notifyMessageDropped") {
250         on("$PREFIX.messages.dropped counter") {
251             val counterName = "$PREFIX.messages.dropped"
252
253             it("should increment counter") {
254                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
255                 cut.notifyMessageDropped(INVALID_MESSAGE)
256
257                 verifyCounter(counterName) {
258                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
259                 }
260                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
261             }
262         }
263
264         on("$PREFIX.messages.dropped.cause counter") {
265             val counterName = "$PREFIX.messages.dropped.cause"
266
267             it("should handle counters for different drop reasons") {
268                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
269                 cut.notifyMessageDropped(INVALID_MESSAGE)
270                 cut.notifyMessageDropped(INVALID_MESSAGE)
271
272                 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
273                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
274                 }
275
276                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
277                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
278                 }
279             }
280         }
281     }
282
283     describe("notifyClientConnected") {
284         on("$PREFIX.connections counter") {
285             val counterName = "$PREFIX.connections"
286
287             it("should increment counter") {
288                 cut.notifyClientConnected()
289                 cut.notifyClientConnected()
290
291                 verifyCounter(counterName) {
292                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
293                 }
294                 verifyCountersAndTimersAreUnchangedBut(counterName)
295             }
296         }
297
298     }
299
300     describe("notifyClientDisconnected") {
301         on("$PREFIX.disconnections counter") {
302             val counterName = "$PREFIX.disconnections"
303
304             it("should increment counter") {
305                 cut.notifyClientDisconnected()
306                 cut.notifyClientDisconnected()
307
308                 verifyCounter(counterName) {
309                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
310                 }
311                 verifyCountersAndTimersAreUnchangedBut(counterName)
312             }
313         }
314
315     }
316
317     describe("notifyClientRejected") {
318
319         on("$PREFIX.clients.rejected") {
320             val counterName = "$PREFIX.clients.rejected"
321             it("should increment counter for each possible reason") {
322                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
323                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
324
325                 verifyCounter(counterName) {
326                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
327                 }
328                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
329             }
330         }
331
332         on("$PREFIX.clients.rejected.cause counter") {
333             val counterName = "$PREFIX.clients.rejected.cause"
334             it("should handle counters for different rejection reasons") {
335                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
336                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
337                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
338
339                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
340                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
341                 }
342
343                 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
344                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
345                 }
346             }
347         }
348     }
349
350     describe("$PREFIX.connections.active gauge") {
351         val gaugeName = "$PREFIX.connections.active"
352
353         on("connection traffic") {
354             it("should calculate positive difference between connected and disconnected clients") {
355                 cut.notifyClientConnected()
356                 cut.notifyClientConnected()
357                 cut.notifyClientConnected()
358                 cut.notifyClientDisconnected()
359
360                 verifyGauge(gaugeName) {
361                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
362                 }
363             }
364
365             it("should calculate no difference between connected and disconnected clients") {
366                 cut.notifyClientDisconnected()
367                 cut.notifyClientDisconnected()
368
369                 verifyGauge(gaugeName) {
370                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
371                 }
372             }
373
374             it("should calculate negative difference between connected and disconnected clients") {
375                 cut.notifyClientDisconnected()
376
377                 verifyGauge(gaugeName) {
378                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
379                 }
380             }
381         }
382     }
383 })
384
385 fun routedMessage(topic: String, partition: Int = 0) =
386         vesEvent().let { evt ->
387             RoutedMessage(topic, partition,
388                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
389         }
390
391 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
392         vesEvent().let { evt ->
393             RoutedMessage(topic, partition,
394                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
395         }
396
397 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
398         vesEvent().let { evt ->
399             val builder = evt.toBuilder()
400             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
401             builder.build()
402         }.let { evt ->
403             RoutedMessage(topic, partition,
404                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
405         }