cb5cfc702367aabfccbc3e6c60bcad08500c48aa
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Try
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Timer
26 import io.micrometer.core.instrument.search.RequiredSearch
27 import io.micrometer.prometheus.PrometheusConfig
28 import io.micrometer.prometheus.PrometheusMeterRegistry
29 import org.assertj.core.api.Assertions.assertThat
30 import org.assertj.core.data.Percentage
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
37 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
38 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
39 import org.onap.dcae.collectors.veshv.model.RoutedMessage
40 import org.onap.dcae.collectors.veshv.model.VesMessage
41 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
43 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
44 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
45 import java.time.Instant
46 import java.time.temporal.Temporal
47 import java.util.concurrent.TimeUnit
48
49 /**
50  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
51  * @since June 2018
52  */
53 object MicrometerMetricsTest : Spek({
54     val doublePrecision = Percentage.withPercentage(0.5)
55     lateinit var registry: PrometheusMeterRegistry
56     lateinit var cut: MicrometerMetrics
57
58     beforeEachTest {
59         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
60         cut = MicrometerMetrics(registry)
61     }
62
63     fun registrySearch() = RequiredSearch.`in`(registry)
64
65     fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
66             Try {
67                 map(search)
68             }.fold(
69                     { ex -> assertThat(ex).doesNotThrowAnyException() },
70                     verifier
71             )
72
73     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
74             verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
75
76     fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
77             verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
78
79     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
80             verifyMeter(search, RequiredSearch::counter, verifier)
81
82     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
83             verifyCounter(registrySearch().name(name), verifier)
84
85     fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
86         registry.meters
87                 .filter { it.id.name.startsWith(PREFIX) }
88                 .filter { it is Counter }
89                 .map { it as Counter }
90                 .filterNot { it.id.name in changedCounters }
91                 .forEach {
92                     assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
93                 }
94     }
95
96     describe("notifyBytesReceived") {
97
98         on("$PREFIX.data.received.bytes counter") {
99             val counterName = "$PREFIX.data.received.bytes"
100
101             it("should increment counter") {
102                 val bytes = 128
103                 cut.notifyBytesReceived(bytes)
104
105                 verifyCounter(counterName) {
106                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
107                 }
108             }
109
110             it("should leave all other counters unchanged") {
111                 cut.notifyBytesReceived(128)
112                 verifyAllCountersAreUnchangedBut(counterName)
113             }
114         }
115     }
116
117     describe("notifyMessageReceived") {
118         on("$PREFIX.messages.received.count counter") {
119             val counterName = "$PREFIX.messages.received.count"
120
121             it("should increment counter") {
122                 cut.notifyMessageReceived(emptyWireProtocolFrame())
123
124                 verifyCounter(counterName) {
125                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
126                 }
127             }
128         }
129
130         on("$PREFIX.messages.received.bytes counter") {
131             val counterName = "$PREFIX.messages.received.bytes"
132
133             it("should increment counter") {
134                 val bytes = 888
135                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
136
137                 verifyCounter(counterName) {
138                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
139                 }
140             }
141         }
142
143         it("should leave all other counters unchanged") {
144             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
145             verifyAllCountersAreUnchangedBut(
146                     "$PREFIX.messages.received.count",
147                     "$PREFIX.messages.received.bytes"
148             )
149         }
150     }
151
152     describe("notifyMessageSent") {
153         val topicName1 = "PERF3GPP"
154         val topicName2 = "CALLTRACE"
155
156         on("$PREFIX.messages.sent.count.total counter") {
157             val counterName = "$PREFIX.messages.sent.count.total"
158
159             it("should increment counter") {
160                 cut.notifyMessageSent(routedMessage(topicName1))
161
162                 verifyCounter(counterName) {
163                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
164                 }
165                 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic")
166             }
167         }
168
169         on("$PREFIX.messages.sent.topic.count counter") {
170             val counterName = "$PREFIX.messages.sent.count.topic"
171             it("should handle counters for different topics") {
172                 cut.notifyMessageSent(routedMessage(topicName1))
173                 cut.notifyMessageSent(routedMessage(topicName2))
174                 cut.notifyMessageSent(routedMessage(topicName2))
175
176                 verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
177                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
178                 }
179
180                 verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) {
181                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
182                 }
183             }
184         }
185
186         on("$PREFIX.messages.processing.time") {
187             val counterName = "$PREFIX.messages.processing.time"
188             val processingTimeMs = 100L
189
190             it("should update timer") {
191
192                 cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
193
194                 verifyTimer(counterName) { timer ->
195                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
196                 }
197                 verifyAllCountersAreUnchangedBut(
198                         counterName,
199                         "$PREFIX.messages.sent.count.topic",
200                         "$PREFIX.messages.sent.count.total")
201             }
202         }
203     }
204
205     describe("notifyMessageDropped") {
206
207         on("$PREFIX.messages.dropped.count.total counter") {
208             val counterName = "$PREFIX.messages.dropped.count.total"
209             it("should increment counter") {
210                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
211                 cut.notifyMessageDropped(INVALID_MESSAGE)
212
213                 verifyCounter(counterName) {
214                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
215                 }
216                 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause")
217             }
218         }
219
220         on("$PREFIX.messages.dropped.count.cause counter") {
221             val counterName = "$PREFIX.messages.dropped.count.cause"
222             it("should handle counters for different drop reasons") {
223                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
224                 cut.notifyMessageDropped(INVALID_MESSAGE)
225                 cut.notifyMessageDropped(INVALID_MESSAGE)
226
227                 verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
228                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
229                 }
230
231                 verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) {
232                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
233                 }
234             }
235         }
236     }
237
238     describe("processing gauge") {
239         it("should show difference between sent and received messages") {
240
241             on("positive difference") {
242                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
243                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
244                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
245                 cut.notifyMessageSent(routedMessage("perf3gpp"))
246                 verifyGauge("messages.processing.count") {
247                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
248                 }
249             }
250
251             on("zero difference") {
252                 cut.notifyMessageReceived(emptyWireProtocolFrame())
253                 cut.notifyMessageSent(routedMessage("perf3gpp"))
254                 verifyGauge("messages.processing.count") {
255                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
256                 }
257             }
258
259             on("negative difference") {
260                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
261                 cut.notifyMessageSent(routedMessage("fault"))
262                 cut.notifyMessageSent(routedMessage("perf3gpp"))
263                 verifyGauge("messages.processing.count") {
264                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
265                 }
266             }
267         }
268     }
269
270 })
271
272 fun routedMessage(topic: String, partition: Int = 0) =
273         vesEvent().let {evt ->
274             RoutedMessage(topic, partition,
275                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
276         }
277
278 fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
279         vesEvent().let {evt ->
280             RoutedMessage(topic, partition,
281                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
282         }