2ecdb26b4dd7b9e1c302d23b3ec2b96563ee6c9d
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import arrow.core.Try
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Timer
26 import io.micrometer.core.instrument.search.RequiredSearch
27 import io.micrometer.prometheus.PrometheusConfig
28 import io.micrometer.prometheus.PrometheusMeterRegistry
29 import org.assertj.core.api.Assertions.assertThat
30 import org.assertj.core.data.Percentage
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
37 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
38 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
40 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.RoutedMessage
42 import org.onap.dcae.collectors.veshv.model.VesMessage
43 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
44 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
45 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
47 import java.time.Instant
48 import java.time.temporal.Temporal
49 import java.util.concurrent.TimeUnit
50
51 /**
52  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
53  * @since June 2018
54  */
55 object MicrometerMetricsTest : Spek({
56     val doublePrecision = Percentage.withPercentage(0.5)
57     lateinit var registry: PrometheusMeterRegistry
58     lateinit var cut: MicrometerMetrics
59
60     beforeEachTest {
61         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
62         cut = MicrometerMetrics(registry)
63     }
64
65     fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
66
67     fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
68             Try {
69                 map(search)
70             }.fold(
71                     { ex -> assertThat(ex).doesNotThrowAnyException() },
72                     verifier
73             )
74
75     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
76             verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
77
78     fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
79             verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
80
81     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
82             verifyMeter(search, RequiredSearch::counter, verifier)
83
84     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
85             verifyCounter(registrySearch(name), verifier)
86
87     fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
88         registry.meters
89                 .filter { it.id.name.startsWith(PREFIX) }
90                 .filter { it is Counter }
91                 .map { it as Counter }
92                 .filterNot { it.id.name in changedCounters }
93                 .forEach {
94                     assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
95                 }
96     }
97
98     describe("notifyBytesReceived") {
99
100         on("$PREFIX.data.received.bytes counter") {
101             val counterName = "$PREFIX.data.received.bytes"
102
103             it("should increment counter") {
104                 val bytes = 128
105                 cut.notifyBytesReceived(bytes)
106
107                 verifyCounter(counterName) {
108                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
109                 }
110             }
111
112             it("should leave all other counters unchanged") {
113                 cut.notifyBytesReceived(128)
114                 verifyAllCountersAreUnchangedBut(counterName)
115             }
116         }
117     }
118
119     describe("notifyMessageReceived") {
120         on("$PREFIX.messages.received.count counter") {
121             val counterName = "$PREFIX.messages.received.count"
122
123             it("should increment counter") {
124                 cut.notifyMessageReceived(emptyWireProtocolFrame())
125
126                 verifyCounter(counterName) {
127                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
128                 }
129             }
130         }
131
132         on("$PREFIX.messages.received.bytes counter") {
133             val counterName = "$PREFIX.messages.received.bytes"
134
135             it("should increment counter") {
136                 val bytes = 888
137                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
138
139                 verifyCounter(counterName) {
140                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
141                 }
142             }
143         }
144
145         it("should leave all other counters unchanged") {
146             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
147             verifyAllCountersAreUnchangedBut(
148                     "$PREFIX.messages.received.count",
149                     "$PREFIX.messages.received.bytes"
150             )
151         }
152     }
153
154     describe("notifyMessageSent") {
155         val topicName1 = "PERF3GPP"
156         val topicName2 = "CALLTRACE"
157
158         on("$PREFIX.messages.sent.count counter") {
159             val counterName = "$PREFIX.messages.sent.count"
160
161             it("should increment counter") {
162                 cut.notifyMessageSent(routedMessage(topicName1))
163
164                 verifyCounter(counterName) {
165                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
166                 }
167                 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
168             }
169         }
170
171         on("$PREFIX.messages.sent.topic.count counter") {
172             val counterName = "$PREFIX.messages.sent.topic.count"
173             it("should handle counters for different topics") {
174                 cut.notifyMessageSent(routedMessage(topicName1))
175                 cut.notifyMessageSent(routedMessage(topicName2))
176                 cut.notifyMessageSent(routedMessage(topicName2))
177
178                 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
179                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
180                 }
181
182                 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
183                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
184                 }
185             }
186         }
187
188         on("$PREFIX.messages.processing.time") {
189             val counterName = "$PREFIX.messages.processing.time"
190             val processingTimeMs = 100L
191
192             it("should update timer") {
193
194                 cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
195
196                 verifyTimer(counterName) { timer ->
197                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
198                 }
199                 verifyAllCountersAreUnchangedBut(
200                         counterName,
201                         "$PREFIX.messages.sent.topic.count",
202                         "$PREFIX.messages.sent.count")
203             }
204         }
205     }
206
207     describe("notifyMessageDropped") {
208
209         on("$PREFIX.messages.dropped.count counter") {
210             val counterName = "$PREFIX.messages.dropped.count"
211             it("should increment counter") {
212                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
213                 cut.notifyMessageDropped(INVALID_MESSAGE)
214
215                 verifyCounter(counterName) {
216                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
217                 }
218                 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
219             }
220         }
221
222         on("$PREFIX.messages.dropped.cause.count counter") {
223             val counterName = "$PREFIX.messages.dropped.cause.count"
224             it("should handle counters for different drop reasons") {
225                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
226                 cut.notifyMessageDropped(INVALID_MESSAGE)
227                 cut.notifyMessageDropped(INVALID_MESSAGE)
228
229                 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
230                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
231                 }
232
233                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
234                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
235                 }
236             }
237         }
238     }
239
240     describe("processing gauge") {
241         it("should show difference between sent and received messages") {
242
243             on("positive difference") {
244                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
245                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
246                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
247                 cut.notifyMessageSent(routedMessage("perf3gpp"))
248                 verifyGauge("messages.processing.count") {
249                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
250                 }
251             }
252
253             on("zero difference") {
254                 cut.notifyMessageReceived(emptyWireProtocolFrame())
255                 cut.notifyMessageSent(routedMessage("perf3gpp"))
256                 verifyGauge("messages.processing.count") {
257                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
258                 }
259             }
260
261             on("negative difference") {
262                 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
263                 cut.notifyMessageSent(routedMessage("fault"))
264                 cut.notifyMessageSent(routedMessage("perf3gpp"))
265                 verifyGauge("messages.processing.count") {
266                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
267                 }
268             }
269         }
270     }
271
272     describe("notifyClientRejected") {
273
274         on("$PREFIX.clients.rejected.count") {
275             val counterName = "$PREFIX.clients.rejected.count"
276             it("should increment counter for each possible reason") {
277                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
278                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
279
280                 verifyCounter(counterName) {
281                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
282                 }
283                 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
284             }
285         }
286
287         on("$PREFIX.clients.rejected.cause.count counter") {
288             val counterName = "$PREFIX.clients.rejected.cause.count"
289             it("should handle counters for different rejection reasons") {
290                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
291                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
292                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
293
294                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
295                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
296                 }
297
298                 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
299                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
300                 }
301             }
302         }
303     }
304 })
305
306 fun routedMessage(topic: String, partition: Int = 0) =
307         vesEvent().let {evt ->
308             RoutedMessage(topic, partition,
309                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
310         }
311
312 fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
313         vesEvent().let {evt ->
314             RoutedMessage(topic, partition,
315                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
316         }