71fc8f7f89b3fa70b691b7fffb93a77110fe7e63
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Try
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Timer
27 import io.micrometer.core.instrument.search.RequiredSearch
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
42 import org.onap.dcae.collectors.veshv.model.RoutedMessage
43 import org.onap.dcae.collectors.veshv.model.VesMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
45 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
48 import java.time.Instant
49 import java.time.temporal.Temporal
50 import java.util.concurrent.TimeUnit
51 import kotlin.reflect.KClass
52
53 /**
54  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
55  * @since June 2018
56  */
57 object MicrometerMetricsTest : Spek({
58     val doublePrecision = Percentage.withPercentage(0.5)
59     val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
60     lateinit var registry: PrometheusMeterRegistry
61     lateinit var cut: MicrometerMetrics
62
63     beforeEachTest {
64         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
65         cut = MicrometerMetrics(registry)
66     }
67
68     fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
69
70     fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
71             Try {
72                 map(search)
73             }.fold(
74                     { ex -> assertThat(ex).doesNotThrowAnyException() },
75                     verifier
76             )
77
78     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
79             verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
80
81     fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
82             verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
83
84     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
85             verifyMeter(search, RequiredSearch::counter, verifier)
86
87     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
88             verifyCounter(registrySearch(name), verifier)
89
90     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
91         fun <T : Meter> verifyAllMetersAreUnchangedBut(
92                 clazz: KClass<T>,
93                 changedCounters: Collection<String>,
94                 valueOf: (T) -> Double) {
95             registry.meters
96                     .filter { it.id.name.startsWith(PREFIX) }
97                     .filter { clazz.isInstance(it) }
98                     .map { it as T }
99                     .filterNot { it.id.name in changedCounters }
100                     .forEach {
101                         assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
102                     }
103         }
104
105         setOf(*changedMeters).let { changedMetersCollection ->
106             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
107             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
108         }
109     }
110
111     describe("notifyBytesReceived") {
112
113         on("$PREFIX.data.received.bytes counter") {
114             val counterName = "$PREFIX.data.received.bytes"
115
116             it("should increment counter") {
117                 val bytes = 128
118                 cut.notifyBytesReceived(bytes)
119
120                 verifyCounter(counterName) {
121                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
122                 }
123             }
124
125             it("should leave all other counters unchanged") {
126                 cut.notifyBytesReceived(128)
127                 verifyCountersAndTimersAreUnchangedBut(counterName)
128             }
129         }
130     }
131
132     describe("notifyMessageReceived") {
133         on("$PREFIX.messages.received.count counter") {
134             val counterName = "$PREFIX.messages.received.count"
135
136             it("should increment counter") {
137                 cut.notifyMessageReceived(emptyWireProtocolFrame())
138
139                 verifyCounter(counterName) {
140                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
141                 }
142             }
143         }
144
145         on("$PREFIX.messages.received.bytes counter") {
146             val counterName = "$PREFIX.messages.received.bytes"
147
148             it("should increment counter") {
149                 val bytes = 888
150                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
151
152                 verifyCounter(counterName) {
153                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
154                 }
155             }
156         }
157
158         it("should leave all other counters unchanged") {
159             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
160             verifyCountersAndTimersAreUnchangedBut(
161                     "$PREFIX.messages.received.count",
162                     "$PREFIX.messages.received.bytes"
163             )
164         }
165     }
166
167     describe("notifyMessageSent") {
168         val topicName1 = "PERF3GPP"
169         val topicName2 = "CALLTRACE"
170
171         on("$PREFIX.messages.sent.count counter") {
172             val counterName = "$PREFIX.messages.sent.count"
173
174             it("should increment counter") {
175                 cut.notifyMessageSent(routedMessage(topicName1))
176
177                 verifyCounter(counterName) {
178                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
179                 }
180                 verifyCountersAndTimersAreUnchangedBut(
181                         counterName,
182                         "$PREFIX.messages.sent.topic.count",
183                         "$PREFIX.messages.processing.time",
184                         "$PREFIX.messages.latency.time")
185             }
186         }
187
188         on("$PREFIX.messages.sent.topic.count counter") {
189             val counterName = "$PREFIX.messages.sent.topic.count"
190             it("should handle counters for different topics") {
191                 cut.notifyMessageSent(routedMessage(topicName1))
192                 cut.notifyMessageSent(routedMessage(topicName2))
193                 cut.notifyMessageSent(routedMessage(topicName2))
194
195                 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
196                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
197                 }
198
199                 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
200                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
201                 }
202             }
203         }
204
205         on("$PREFIX.messages.processing.time") {
206             val counterName = "$PREFIX.messages.processing.time"
207             val processingTimeMs = 100L
208
209             it("should update timer") {
210
211                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
212
213                 verifyTimer(counterName) { timer ->
214                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
215                 }
216                 verifyCountersAndTimersAreUnchangedBut(
217                         counterName,
218                         "$PREFIX.messages.sent.topic.count",
219                         "$PREFIX.messages.sent.count",
220                         "$PREFIX.messages.latency.time")
221             }
222         }
223
224         on("$PREFIX.messages.latency.time") {
225             val counterName = "$PREFIX.messages.latency.time"
226             val latencyMs = 1666L
227
228             it("should update timer") {
229
230                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
231
232                 verifyTimer(counterName) { timer ->
233                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
234                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
235                             .isLessThanOrEqualTo(latencyMs + 10000.0)
236
237                 }
238                 verifyCountersAndTimersAreUnchangedBut(
239                         counterName,
240                         "$PREFIX.messages.sent.topic.count",
241                         "$PREFIX.messages.sent.count",
242                         "$PREFIX.messages.processing.time")
243             }
244         }
245
246     }
247
248     describe("notifyMessageDropped") {
249
250         on("$PREFIX.messages.dropped.count counter") {
251             val counterName = "$PREFIX.messages.dropped.count"
252             it("should increment counter") {
253                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
254                 cut.notifyMessageDropped(INVALID_MESSAGE)
255
256                 verifyCounter(counterName) {
257                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
258                 }
259                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
260             }
261         }
262
263         on("$PREFIX.messages.dropped.cause.count counter") {
264             val counterName = "$PREFIX.messages.dropped.cause.count"
265             it("should handle counters for different drop reasons") {
266                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
267                 cut.notifyMessageDropped(INVALID_MESSAGE)
268                 cut.notifyMessageDropped(INVALID_MESSAGE)
269
270                 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
271                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
272                 }
273
274                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
275                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
276                 }
277             }
278         }
279     }
280
281     describe("processing gauge") {
282         it("should show difference between sent and received messages") {
283
284             on("positive difference") {
285                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
286                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
287                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
288                 cut.notifyMessageSent(routedMessage("perf3gpp"))
289                 verifyGauge("messages.processing.count") {
290                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
291                 }
292             }
293
294             on("zero difference") {
295                 cut.notifyMessageReceived(emptyWireProtocolFrame())
296                 cut.notifyMessageSent(routedMessage("perf3gpp"))
297                 verifyGauge("messages.processing.count") {
298                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
299                 }
300             }
301
302             on("negative difference") {
303                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
304                 cut.notifyMessageSent(routedMessage("fault"))
305                 cut.notifyMessageSent(routedMessage("perf3gpp"))
306                 verifyGauge("messages.processing.count") {
307                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
308                 }
309             }
310         }
311     }
312
313     describe("notifyClientRejected") {
314
315         on("$PREFIX.clients.rejected.count") {
316             val counterName = "$PREFIX.clients.rejected.count"
317             it("should increment counter for each possible reason") {
318                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
319                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
320
321                 verifyCounter(counterName) {
322                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
323                 }
324                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
325             }
326         }
327
328         on("$PREFIX.clients.rejected.cause.count counter") {
329             val counterName = "$PREFIX.clients.rejected.cause.count"
330             it("should handle counters for different rejection reasons") {
331                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
332                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
333                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
334
335                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
336                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
337                 }
338
339                 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
340                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
341                 }
342             }
343         }
344     }
345 })
346
347 fun routedMessage(topic: String, partition: Int = 0) =
348         vesEvent().let { evt ->
349             RoutedMessage(topic, partition,
350                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
351         }
352
353 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
354         vesEvent().let { evt ->
355             RoutedMessage(topic, partition,
356                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
357         }
358
359 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
360         vesEvent().let { evt ->
361             val builder = evt.toBuilder()
362             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
363             builder.build()
364         }.let { evt ->
365             RoutedMessage(topic, partition,
366                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
367         }