a3471d4654d7c4bfadb2e89d0d201662a88c7a0b
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Option
23 import com.google.protobuf.ByteString
24 import io.micrometer.core.instrument.Counter
25 import io.micrometer.core.instrument.Meter
26 import io.micrometer.core.instrument.Tags
27 import io.micrometer.core.instrument.Timer
28 import io.micrometer.prometheus.PrometheusConfig
29 import io.micrometer.prometheus.PrometheusMeterRegistry
30 import org.assertj.core.api.Assertions.assertThat
31 import org.assertj.core.data.Percentage
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
42 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
43 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
44 import org.onap.dcae.collectors.veshv.domain.VesMessage
45 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
46 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
47 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
48 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
49 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
50 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
51 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
52 import org.onap.ves.VesEventOuterClass
53 import java.time.Instant
54 import java.time.temporal.Temporal
55 import java.util.concurrent.TimeUnit
56 import kotlin.reflect.KClass
57
58 /**
59  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
60  * @since June 2018
61  */
62 object MicrometerMetricsTest : Spek({
63     val doublePrecision = Percentage.withPercentage(0.5)
64     lateinit var registry: PrometheusMeterRegistry
65     lateinit var cut: MicrometerMetrics
66
67     beforeEachTest {
68         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
69         cut = MicrometerMetrics(registry)
70     }
71
72     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
73         fun <T : Meter> verifyAllMetersAreUnchangedBut(
74                 clazz: KClass<T>,
75                 changedCounters: Collection<String>,
76                 valueOf: (T) -> Double) {
77             registry.meters
78                     .filter { it.id.name.startsWith(PREFIX) }
79                     .filter { clazz.isInstance(it) }
80                     .map { it as T }
81                     .filterNot { it.id.name in changedCounters }
82                     .forEach {
83                         assertThat(valueOf(it))
84                                 .describedAs(it.id.toString())
85                                 .isCloseTo(0.0, doublePrecision)
86                     }
87         }
88
89         setOf(*changedMeters).let { changedMetersCollection ->
90             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
91             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
92         }
93     }
94
95
96     describe("notifyBytesReceived") {
97         on("$PREFIX.data.received.bytes counter") {
98             val counterName = "$PREFIX.data.received.bytes"
99
100             it("should increment counter") {
101                 val bytes = 128
102                 cut.notifyBytesReceived(bytes)
103
104                 registry.verifyCounter(counterName) {
105                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
106                 }
107             }
108
109             it("should leave all other counters unchanged") {
110                 cut.notifyBytesReceived(128)
111                 verifyCountersAndTimersAreUnchangedBut(counterName)
112             }
113         }
114     }
115
116     describe("notifyMessageReceived") {
117         on("$PREFIX.messages.received counter") {
118             val counterName = "$PREFIX.messages.received"
119
120             it("should increment counter") {
121                 cut.notifyMessageReceived(emptyWireProtocolFrame())
122
123                 registry.verifyCounter(counterName) {
124                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
125                 }
126             }
127         }
128
129         on("$PREFIX.messages.received.payload.bytes counter") {
130             val counterName = "$PREFIX.messages.received.payload.bytes"
131
132             it("should increment counter") {
133                 val bytes = 888
134                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
135
136                 registry.verifyCounter(counterName) {
137                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
138                 }
139             }
140         }
141
142         it("should leave all other counters unchanged") {
143             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
144             verifyCountersAndTimersAreUnchangedBut(
145                     "$PREFIX.messages.received",
146                     "$PREFIX.messages.received.payload.bytes"
147             )
148         }
149     }
150
151     describe("notifyMessageSent") {
152         val topicName1 = "PERF3GPP"
153         val topicName2 = "CALLTRACE"
154
155         on("$PREFIX.messages.sent counter") {
156             val counterName = "$PREFIX.messages.sent"
157
158             it("should increment counter") {
159                 cut.notifyMessageSent(routedMessage(topicName1))
160
161                 registry.verifyCounter(counterName) {
162                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
163                 }
164                 verifyCountersAndTimersAreUnchangedBut(
165                         counterName,
166                         "$PREFIX.messages.sent.topic",
167                         "$PREFIX.messages.processing.time",
168                         "$PREFIX.messages.latency")
169             }
170         }
171
172         on("$PREFIX.messages.sent.topic counter") {
173             val counterName = "$PREFIX.messages.sent.topic"
174
175             it("should handle counters for different topics") {
176                 cut.notifyMessageSent(routedMessage(topicName1))
177                 cut.notifyMessageSent(routedMessage(topicName2))
178                 cut.notifyMessageSent(routedMessage(topicName2))
179
180                 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
181                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
182                 }
183
184                 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
185                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
186                 }
187             }
188         }
189
190         on("$PREFIX.messages.processing.time") {
191             val counterName = "$PREFIX.messages.processing.time"
192             val processingTimeMs = 100L
193
194             it("should update timer") {
195
196                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
197
198                 registry.verifyTimer(counterName) { timer ->
199                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
200                 }
201                 verifyCountersAndTimersAreUnchangedBut(
202                         counterName,
203                         "$PREFIX.messages.sent.topic",
204                         "$PREFIX.messages.sent",
205                         "$PREFIX.messages.latency")
206             }
207         }
208
209         on("$PREFIX.messages.processing.time.without.routing") {
210             val counterName = "$PREFIX.messages.processing.time.without.routing"
211             val processingTimeMs = 100L
212
213             it("should update timer") {
214
215                 cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
216
217                 registry.verifyTimer(counterName) { timer ->
218                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
219                 }
220                 verifyCountersAndTimersAreUnchangedBut(
221                         counterName,
222                         "$PREFIX.messages.sent.topic",
223                         "$PREFIX.messages.sent",
224                         "$PREFIX.messages.latency")
225             }
226         }
227
228         on("$PREFIX.messages.latency") {
229             val counterName = "$PREFIX.messages.latency"
230             val latencyMs = 1666L
231
232             it("should update timer") {
233
234                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
235
236                 registry.verifyTimer(counterName) { timer ->
237                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
238                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
239                             .isLessThanOrEqualTo(latencyMs + 10000.0)
240
241                 }
242                 verifyCountersAndTimersAreUnchangedBut(
243                         counterName,
244                         "$PREFIX.messages.sent.topic",
245                         "$PREFIX.messages.sent",
246                         "$PREFIX.messages.processing.time")
247             }
248         }
249     }
250
251     describe("notifyMessageDropped") {
252         on("$PREFIX.messages.dropped counter") {
253             val counterName = "$PREFIX.messages.dropped"
254
255             it("should increment counter") {
256                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
257                 cut.notifyMessageDropped(INVALID_MESSAGE)
258
259                 registry.verifyCounter(counterName) {
260                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
261                 }
262                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
263             }
264         }
265
266         on("$PREFIX.messages.dropped.cause counter") {
267             val counterName = "$PREFIX.messages.dropped.cause"
268
269             it("should handle counters for different drop reasons") {
270                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
271                 cut.notifyMessageDropped(INVALID_MESSAGE)
272                 cut.notifyMessageDropped(INVALID_MESSAGE)
273
274                 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
275                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
276                 }
277
278                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
279                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
280                 }
281             }
282         }
283     }
284
285     describe("notifyClientConnected") {
286         on("$PREFIX.connections counter") {
287             val counterName = "$PREFIX.connections"
288
289             it("should increment counter") {
290                 cut.notifyClientConnected()
291                 cut.notifyClientConnected()
292
293                 registry.verifyCounter(counterName) {
294                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
295                 }
296                 verifyCountersAndTimersAreUnchangedBut(counterName)
297             }
298         }
299
300     }
301
302     describe("notifyClientDisconnected") {
303         on("$PREFIX.disconnections counter") {
304             val counterName = "$PREFIX.disconnections"
305
306             it("should increment counter") {
307                 cut.notifyClientDisconnected()
308                 cut.notifyClientDisconnected()
309
310                 registry.verifyCounter(counterName) {
311                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
312                 }
313                 verifyCountersAndTimersAreUnchangedBut(counterName)
314             }
315         }
316
317     }
318
319     describe("notifyClientRejected") {
320
321         on("$PREFIX.clients.rejected") {
322             val counterName = "$PREFIX.clients.rejected"
323             it("should increment counter for each possible reason") {
324                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
325                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
326
327                 registry.verifyCounter(counterName) {
328                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
329                 }
330                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
331             }
332         }
333
334         on("$PREFIX.clients.rejected.cause counter") {
335             val counterName = "$PREFIX.clients.rejected.cause"
336             it("should handle counters for different rejection reasons") {
337                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
338                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
339                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
340
341                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
342                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
343                 }
344
345                 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
346                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
347                 }
348             }
349         }
350     }
351
352     describe("$PREFIX.connections.active gauge") {
353         val gaugeName = "$PREFIX.connections.active"
354
355         on("connection traffic") {
356             it("should calculate positive difference between connected and disconnected clients") {
357                 cut.notifyClientConnected()
358                 cut.notifyClientConnected()
359                 cut.notifyClientConnected()
360                 cut.notifyClientDisconnected()
361
362                 registry.verifyGauge(gaugeName) {
363                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
364                 }
365             }
366
367             it("should calculate no difference between connected and disconnected clients") {
368                 cut.notifyClientDisconnected()
369                 cut.notifyClientDisconnected()
370
371                 registry.verifyGauge(gaugeName) {
372                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
373                 }
374             }
375
376             it("should calculate negative difference between connected and disconnected clients") {
377                 cut.notifyClientDisconnected()
378
379                 registry.verifyGauge(gaugeName) {
380                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
381                 }
382             }
383         }
384     }
385 })
386
387 private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
388     val commonHeader = commonHeader(domain)
389     return VesMessage(commonHeader,
390             wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
391 }
392
393 private fun routedMessage(topic: String, partition: Int = 0) =
394         vesEvent().run { toRoutedMessage(topic, partition) }
395
396 private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
397         vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
398
399 private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
400         vesEvent().run {
401             val builder = toBuilder()
402             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
403             builder.build().toRoutedMessage(topic, partition)
404         }
405
406 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
407                                                         partition: Int,
408                                                         receivedAt: Temporal = Instant.now()) =
409         RoutedMessage(
410                 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
411                 topic,
412                 Option.just(partition)
413         )
414