66f3a5fcd10b5066b797cc37c6780efe647ff79f
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Option
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Meter
25 import io.micrometer.core.instrument.Tags
26 import io.micrometer.core.instrument.Timer
27 import io.micrometer.prometheus.PrometheusConfig
28 import io.micrometer.prometheus.PrometheusMeterRegistry
29 import org.assertj.core.api.Assertions.assertThat
30 import org.assertj.core.data.Percentage
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
37 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
39 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
41 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
42 import org.onap.dcae.collectors.veshv.domain.VesMessage
43 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
44 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
45 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
46 import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
47 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
48 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
49 import org.onap.ves.VesEventOuterClass
50 import java.time.Instant
51 import java.time.temporal.Temporal
52 import java.util.concurrent.TimeUnit
53 import kotlin.reflect.KClass
54
55 /**
56  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
57  * @since June 2018
58  */
59 object MicrometerMetricsTest : Spek({
60     val doublePrecision = Percentage.withPercentage(0.5)
61     lateinit var registry: PrometheusMeterRegistry
62     lateinit var cut: MicrometerMetrics
63
64     beforeEachTest {
65         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
66         cut = MicrometerMetrics(registry)
67     }
68
69     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
70         fun <T : Meter> verifyAllMetersAreUnchangedBut(
71                 clazz: KClass<T>,
72                 changedCounters: Collection<String>,
73                 valueOf: (T) -> Double) {
74             registry.meters
75                     .filter { it.id.name.startsWith(PREFIX) }
76                     .filter { clazz.isInstance(it) }
77                     .map { it as T }
78                     .filterNot { it.id.name in changedCounters }
79                     .forEach {
80                         assertThat(valueOf(it))
81                                 .describedAs(it.id.toString())
82                                 .isCloseTo(0.0, doublePrecision)
83                     }
84         }
85
86         setOf(*changedMeters).let { changedMetersCollection ->
87             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
88             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
89         }
90     }
91
92
93     describe("notifyBytesReceived") {
94         on("$PREFIX.data.received.bytes counter") {
95             val counterName = "$PREFIX.data.received.bytes"
96
97             it("should increment counter") {
98                 val bytes = 128
99                 cut.notifyBytesReceived(bytes)
100
101                 registry.verifyCounter(counterName) {
102                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
103                 }
104             }
105
106             it("should leave all other counters unchanged") {
107                 cut.notifyBytesReceived(128)
108                 verifyCountersAndTimersAreUnchangedBut(counterName)
109             }
110         }
111     }
112
113     describe("notifyMessageReceived") {
114         on("$PREFIX.messages.received counter") {
115             val counterName = "$PREFIX.messages.received"
116
117             it("should increment counter") {
118                 cut.notifyMessageReceived(emptyWireProtocolFrame())
119
120                 registry.verifyCounter(counterName) {
121                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
122                 }
123             }
124         }
125
126         on("$PREFIX.messages.received.payload.bytes counter") {
127             val counterName = "$PREFIX.messages.received.payload.bytes"
128
129             it("should increment counter") {
130                 val bytes = 888
131                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
132
133                 registry.verifyCounter(counterName) {
134                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
135                 }
136             }
137         }
138
139         it("should leave all other counters unchanged") {
140             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
141             verifyCountersAndTimersAreUnchangedBut(
142                     "$PREFIX.messages.received",
143                     "$PREFIX.messages.received.payload.bytes"
144             )
145         }
146     }
147
148     describe("notifyMessageSent") {
149         val topicName1 = "PERF3GPP"
150         val topicName2 = "CALLTRACE"
151
152         on("$PREFIX.messages.sent counter") {
153             val counterName = "$PREFIX.messages.sent"
154
155             it("should increment counter") {
156                 cut.notifyMessageSent(routedMessage(topicName1))
157
158                 registry.verifyCounter(counterName) {
159                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
160                 }
161                 verifyCountersAndTimersAreUnchangedBut(
162                         counterName,
163                         "$PREFIX.messages.sent.topic",
164                         "$PREFIX.messages.processing.time",
165                         "$PREFIX.messages.latency")
166             }
167         }
168
169         on("$PREFIX.messages.sent.topic counter") {
170             val counterName = "$PREFIX.messages.sent.topic"
171
172             it("should handle counters for different topics") {
173                 cut.notifyMessageSent(routedMessage(topicName1))
174                 cut.notifyMessageSent(routedMessage(topicName2))
175                 cut.notifyMessageSent(routedMessage(topicName2))
176
177                 registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
178                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
179                 }
180
181                 registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
182                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
183                 }
184             }
185         }
186
187         on("$PREFIX.messages.processing.time") {
188             val counterName = "$PREFIX.messages.processing.time"
189             val processingTimeMs = 100L
190
191             it("should update timer") {
192
193                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
194
195                 registry.verifyTimer(counterName) { timer ->
196                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
197                 }
198                 verifyCountersAndTimersAreUnchangedBut(
199                         counterName,
200                         "$PREFIX.messages.sent.topic",
201                         "$PREFIX.messages.sent",
202                         "$PREFIX.messages.latency")
203             }
204         }
205
206         on("$PREFIX.messages.latency") {
207             val counterName = "$PREFIX.messages.latency"
208             val latencyMs = 1666L
209
210             it("should update timer") {
211
212                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
213
214                 registry.verifyTimer(counterName) { timer ->
215                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
216                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
217                             .isLessThanOrEqualTo(latencyMs + 10000.0)
218
219                 }
220                 verifyCountersAndTimersAreUnchangedBut(
221                         counterName,
222                         "$PREFIX.messages.sent.topic",
223                         "$PREFIX.messages.sent",
224                         "$PREFIX.messages.processing.time")
225             }
226         }
227     }
228
229     describe("notifyMessageDropped") {
230         on("$PREFIX.messages.dropped counter") {
231             val counterName = "$PREFIX.messages.dropped"
232
233             it("should increment counter") {
234                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
235                 cut.notifyMessageDropped(INVALID_MESSAGE)
236
237                 registry.verifyCounter(counterName) {
238                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
239                 }
240                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
241             }
242         }
243
244         on("$PREFIX.messages.dropped.cause counter") {
245             val counterName = "$PREFIX.messages.dropped.cause"
246
247             it("should handle counters for different drop reasons") {
248                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
249                 cut.notifyMessageDropped(INVALID_MESSAGE)
250                 cut.notifyMessageDropped(INVALID_MESSAGE)
251
252                 registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
253                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
254                 }
255
256                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
257                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
258                 }
259             }
260         }
261     }
262
263     describe("notifyClientConnected") {
264         on("$PREFIX.connections counter") {
265             val counterName = "$PREFIX.connections"
266
267             it("should increment counter") {
268                 cut.notifyClientConnected()
269                 cut.notifyClientConnected()
270
271                 registry.verifyCounter(counterName) {
272                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
273                 }
274                 verifyCountersAndTimersAreUnchangedBut(counterName)
275             }
276         }
277
278     }
279
280     describe("notifyClientDisconnected") {
281         on("$PREFIX.disconnections counter") {
282             val counterName = "$PREFIX.disconnections"
283
284             it("should increment counter") {
285                 cut.notifyClientDisconnected()
286                 cut.notifyClientDisconnected()
287
288                 registry.verifyCounter(counterName) {
289                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
290                 }
291                 verifyCountersAndTimersAreUnchangedBut(counterName)
292             }
293         }
294
295     }
296
297     describe("notifyClientRejected") {
298
299         on("$PREFIX.clients.rejected") {
300             val counterName = "$PREFIX.clients.rejected"
301             it("should increment counter for each possible reason") {
302                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
303                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
304
305                 registry.verifyCounter(counterName) {
306                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
307                 }
308                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
309             }
310         }
311
312         on("$PREFIX.clients.rejected.cause counter") {
313             val counterName = "$PREFIX.clients.rejected.cause"
314             it("should handle counters for different rejection reasons") {
315                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
316                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
317                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
318
319                 registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
320                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
321                 }
322
323                 registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
324                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
325                 }
326             }
327         }
328     }
329
330     describe("$PREFIX.connections.active gauge") {
331         val gaugeName = "$PREFIX.connections.active"
332
333         on("connection traffic") {
334             it("should calculate positive difference between connected and disconnected clients") {
335                 cut.notifyClientConnected()
336                 cut.notifyClientConnected()
337                 cut.notifyClientConnected()
338                 cut.notifyClientDisconnected()
339
340                 registry.verifyGauge(gaugeName) {
341                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
342                 }
343             }
344
345             it("should calculate no difference between connected and disconnected clients") {
346                 cut.notifyClientDisconnected()
347                 cut.notifyClientDisconnected()
348
349                 registry.verifyGauge(gaugeName) {
350                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
351                 }
352             }
353
354             it("should calculate negative difference between connected and disconnected clients") {
355                 cut.notifyClientDisconnected()
356
357                 registry.verifyGauge(gaugeName) {
358                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
359                 }
360             }
361         }
362     }
363 })
364
365 fun routedMessage(topic: String, partition: Int = 0) =
366         vesEvent().run { toRoutedMessage(topic, partition) }
367
368 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
369         vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
370
371 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
372         vesEvent().run {
373             val builder = toBuilder()
374             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
375             builder.build().toRoutedMessage(topic, partition)
376         }
377
378 private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
379                                                         partition: Int,
380                                                         receivedAt: Temporal = Instant.now()) =
381         RoutedMessage(
382                 VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
383                 topic,
384                 Option.just(partition)
385         )
386