f260f158b38f7f8a766240be46595bd69893dbfb
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Option
23 import arrow.core.Try
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Gauge
26 import io.micrometer.core.instrument.Meter
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.core.instrument.search.RequiredSearch
29 import io.micrometer.prometheus.PrometheusConfig
30 import io.micrometer.prometheus.PrometheusMeterRegistry
31 import org.assertj.core.api.Assertions.assertThat
32 import org.assertj.core.data.Percentage
33 import org.jetbrains.spek.api.Spek
34 import org.jetbrains.spek.api.dsl.describe
35 import org.jetbrains.spek.api.dsl.it
36 import org.jetbrains.spek.api.dsl.on
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
38 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
40 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
42 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
43 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
44 import org.onap.dcae.collectors.veshv.domain.VesMessage
45 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
46 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
47 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
48 import org.onap.ves.VesEventOuterClass
49 import java.time.Instant
50 import java.time.temporal.Temporal
51 import java.util.concurrent.TimeUnit
52 import kotlin.reflect.KClass
53
54 /**
55  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
56  * @since June 2018
57  */
58 object MicrometerMetricsTest : Spek({
59     val doublePrecision = Percentage.withPercentage(0.5)
60     lateinit var registry: PrometheusMeterRegistry
61     lateinit var cut: MicrometerMetrics
62
63     beforeEachTest {
64         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
65         cut = MicrometerMetrics(registry)
66     }
67
68     fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
69
70     fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
71             Try {
72                 map(search)
73             }.fold(
74                     { ex -> assertThat(ex).doesNotThrowAnyException() },
75                     verifier
76             )
77
78     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
79             verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
80
81     fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
82             verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
83
84     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
85             verifyMeter(search, RequiredSearch::counter, verifier)
86
87     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
88             verifyCounter(registrySearch(name), verifier)
89
90
91     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
92         fun <T : Meter> verifyAllMetersAreUnchangedBut(
93                 clazz: KClass<T>,
94                 changedCounters: Collection<String>,
95                 valueOf: (T) -> Double) {
96             registry.meters
97                     .filter { it.id.name.startsWith(PREFIX) }
98                     .filter { clazz.isInstance(it) }
99                     .map { it as T }
100                     .filterNot { it.id.name in changedCounters }
101                     .forEach {
102                         assertThat(valueOf(it))
103                                 .describedAs(it.id.toString())
104                                 .isCloseTo(0.0, doublePrecision)
105                     }
106         }
107
108         setOf(*changedMeters).let { changedMetersCollection ->
109             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
110             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
111         }
112     }
113
114
115     describe("notifyBytesReceived") {
116         on("$PREFIX.data.received.bytes counter") {
117             val counterName = "$PREFIX.data.received.bytes"
118
119             it("should increment counter") {
120                 val bytes = 128
121                 cut.notifyBytesReceived(bytes)
122
123                 verifyCounter(counterName) {
124                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
125                 }
126             }
127
128             it("should leave all other counters unchanged") {
129                 cut.notifyBytesReceived(128)
130                 verifyCountersAndTimersAreUnchangedBut(counterName)
131             }
132         }
133     }
134
135     describe("notifyMessageReceived") {
136         on("$PREFIX.messages.received counter") {
137             val counterName = "$PREFIX.messages.received"
138
139             it("should increment counter") {
140                 cut.notifyMessageReceived(emptyWireProtocolFrame())
141
142                 verifyCounter(counterName) {
143                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
144                 }
145             }
146         }
147
148         on("$PREFIX.messages.received.payload.bytes counter") {
149             val counterName = "$PREFIX.messages.received.payload.bytes"
150
151             it("should increment counter") {
152                 val bytes = 888
153                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
154
155                 verifyCounter(counterName) {
156                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
157                 }
158             }
159         }
160
161         it("should leave all other counters unchanged") {
162             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
163             verifyCountersAndTimersAreUnchangedBut(
164                     "$PREFIX.messages.received",
165                     "$PREFIX.messages.received.payload.bytes"
166             )
167         }
168     }
169
170     describe("notifyMessageSent") {
171         val topicName1 = "PERF3GPP"
172         val topicName2 = "CALLTRACE"
173
174         on("$PREFIX.messages.sent counter") {
175             val counterName = "$PREFIX.messages.sent"
176
177             it("should increment counter") {
178                 cut.notifyMessageSent(routedMessage(topicName1))
179
180                 verifyCounter(counterName) {
181                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
182                 }
183                 verifyCountersAndTimersAreUnchangedBut(
184                         counterName,
185                         "$PREFIX.messages.sent.topic",
186                         "$PREFIX.messages.processing.time",
187                         "$PREFIX.messages.latency")
188             }
189         }
190
191         on("$PREFIX.messages.sent.topic counter") {
192             val counterName = "$PREFIX.messages.sent.topic"
193
194             it("should handle counters for different topics") {
195                 cut.notifyMessageSent(routedMessage(topicName1))
196                 cut.notifyMessageSent(routedMessage(topicName2))
197                 cut.notifyMessageSent(routedMessage(topicName2))
198
199                 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
200                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
201                 }
202
203                 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
204                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
205                 }
206             }
207         }
208
209         on("$PREFIX.messages.processing.time") {
210             val counterName = "$PREFIX.messages.processing.time"
211             val processingTimeMs = 100L
212
213             it("should update timer") {
214
215                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
216
217                 verifyTimer(counterName) { timer ->
218                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
219                 }
220                 verifyCountersAndTimersAreUnchangedBut(
221                         counterName,
222                         "$PREFIX.messages.sent.topic",
223                         "$PREFIX.messages.sent",
224                         "$PREFIX.messages.latency")
225             }
226         }
227
228         on("$PREFIX.messages.latency") {
229             val counterName = "$PREFIX.messages.latency"
230             val latencyMs = 1666L
231
232             it("should update timer") {
233
234                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
235
236                 verifyTimer(counterName) { timer ->
237                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
238                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
239                             .isLessThanOrEqualTo(latencyMs + 10000.0)
240
241                 }
242                 verifyCountersAndTimersAreUnchangedBut(
243                         counterName,
244                         "$PREFIX.messages.sent.topic",
245                         "$PREFIX.messages.sent",
246                         "$PREFIX.messages.processing.time")
247             }
248         }
249     }
250
251     describe("notifyMessageDropped") {
252         on("$PREFIX.messages.dropped counter") {
253             val counterName = "$PREFIX.messages.dropped"
254
255             it("should increment counter") {
256                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
257                 cut.notifyMessageDropped(INVALID_MESSAGE)
258
259                 verifyCounter(counterName) {
260                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
261                 }
262                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
263             }
264         }
265
266         on("$PREFIX.messages.dropped.cause counter") {
267             val counterName = "$PREFIX.messages.dropped.cause"
268
269             it("should handle counters for different drop reasons") {
270                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
271                 cut.notifyMessageDropped(INVALID_MESSAGE)
272                 cut.notifyMessageDropped(INVALID_MESSAGE)
273
274                 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
275                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
276                 }
277
278                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
279                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
280                 }
281             }
282         }
283     }
284
285     describe("notifyClientConnected") {
286         on("$PREFIX.connections counter") {
287             val counterName = "$PREFIX.connections"
288
289             it("should increment counter") {
290                 cut.notifyClientConnected()
291                 cut.notifyClientConnected()
292
293                 verifyCounter(counterName) {
294                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
295                 }
296                 verifyCountersAndTimersAreUnchangedBut(counterName)
297             }
298         }
299
300     }
301
302     describe("notifyClientDisconnected") {
303         on("$PREFIX.disconnections counter") {
304             val counterName = "$PREFIX.disconnections"
305
306             it("should increment counter") {
307                 cut.notifyClientDisconnected()
308                 cut.notifyClientDisconnected()
309
310                 verifyCounter(counterName) {
311                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
312                 }
313                 verifyCountersAndTimersAreUnchangedBut(counterName)
314             }
315         }
316
317     }
318
319     describe("notifyClientRejected") {
320
321         on("$PREFIX.clients.rejected") {
322             val counterName = "$PREFIX.clients.rejected"
323             it("should increment counter for each possible reason") {
324                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
325                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
326
327                 verifyCounter(counterName) {
328                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
329                 }
330                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
331             }
332         }
333
334         on("$PREFIX.clients.rejected.cause counter") {
335             val counterName = "$PREFIX.clients.rejected.cause"
336             it("should handle counters for different rejection reasons") {
337                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
338                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
339                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
340
341                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
342                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
343                 }
344
345                 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
346                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
347                 }
348             }
349         }
350     }
351
352     describe("$PREFIX.connections.active gauge") {
353         val gaugeName = "$PREFIX.connections.active"
354
355         on("connection traffic") {
356             it("should calculate positive difference between connected and disconnected clients") {
357                 cut.notifyClientConnected()
358                 cut.notifyClientConnected()
359                 cut.notifyClientConnected()
360                 cut.notifyClientDisconnected()
361
362                 verifyGauge(gaugeName) {
363                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
364                 }
365             }
366
367             it("should calculate no difference between connected and disconnected clients") {
368                 cut.notifyClientDisconnected()
369                 cut.notifyClientDisconnected()
370
371                 verifyGauge(gaugeName) {
372                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
373                 }
374             }
375
376             it("should calculate negative difference between connected and disconnected clients") {
377                 cut.notifyClientDisconnected()
378
379                 verifyGauge(gaugeName) {
380                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
381                 }
382             }
383         }
384     }
385 })
386
387 fun routedMessage(topic: String, partition: Int = 0) =
388         vesEvent().run { toRoutedMessage(topic, partition) }
389
390 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
391         vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
392
393 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
394         vesEvent().run {
395             val builder = toBuilder()
396             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
397             builder.build().toRoutedMessage(topic, partition)
398         }
399
400 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
401                                                         partition: Int,
402                                                         receivedAt: Temporal = Instant.now()) =
403         RoutedMessage(
404                 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
405                 topic,
406                 Option.just(partition)
407         )
408