2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Timer
26 import io.micrometer.core.instrument.search.RequiredSearch
27 import io.micrometer.prometheus.PrometheusConfig
28 import io.micrometer.prometheus.PrometheusMeterRegistry
29 import org.assertj.core.api.Assertions.assertThat
30 import org.assertj.core.data.Percentage
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
37 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
38 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
40 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
41 import org.onap.dcae.collectors.veshv.model.RoutedMessage
42 import org.onap.dcae.collectors.veshv.model.VesMessage
43 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
44 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
45 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
47 import java.time.Instant
48 import java.time.temporal.Temporal
49 import java.util.concurrent.TimeUnit
52 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
55 object MicrometerMetricsTest : Spek({
56 val doublePrecision = Percentage.withPercentage(0.5)
57 lateinit var registry: PrometheusMeterRegistry
58 lateinit var cut: MicrometerMetrics
61 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
62 cut = MicrometerMetrics(registry)
65 fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
67 fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
71 { ex -> assertThat(ex).doesNotThrowAnyException() },
75 fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
76 verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
78 fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
79 verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
81 fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
82 verifyMeter(search, RequiredSearch::counter, verifier)
84 fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
85 verifyCounter(registrySearch(name), verifier)
87 fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
89 .filter { it.id.name.startsWith(PREFIX) }
90 .filter { it is Counter }
91 .map { it as Counter }
92 .filterNot { it.id.name in changedCounters }
94 assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
98 describe("notifyBytesReceived") {
100 on("$PREFIX.data.received.bytes counter") {
101 val counterName = "$PREFIX.data.received.bytes"
103 it("should increment counter") {
105 cut.notifyBytesReceived(bytes)
107 verifyCounter(counterName) {
108 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
112 it("should leave all other counters unchanged") {
113 cut.notifyBytesReceived(128)
114 verifyAllCountersAreUnchangedBut(counterName)
119 describe("notifyMessageReceived") {
120 on("$PREFIX.messages.received.count counter") {
121 val counterName = "$PREFIX.messages.received.count"
123 it("should increment counter") {
124 cut.notifyMessageReceived(emptyWireProtocolFrame())
126 verifyCounter(counterName) {
127 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
132 on("$PREFIX.messages.received.bytes counter") {
133 val counterName = "$PREFIX.messages.received.bytes"
135 it("should increment counter") {
137 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
139 verifyCounter(counterName) {
140 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
145 it("should leave all other counters unchanged") {
146 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
147 verifyAllCountersAreUnchangedBut(
148 "$PREFIX.messages.received.count",
149 "$PREFIX.messages.received.bytes"
154 describe("notifyMessageSent") {
155 val topicName1 = "PERF3GPP"
156 val topicName2 = "CALLTRACE"
158 on("$PREFIX.messages.sent.count counter") {
159 val counterName = "$PREFIX.messages.sent.count"
161 it("should increment counter") {
162 cut.notifyMessageSent(routedMessage(topicName1))
164 verifyCounter(counterName) {
165 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
167 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
171 on("$PREFIX.messages.sent.topic.count counter") {
172 val counterName = "$PREFIX.messages.sent.topic.count"
173 it("should handle counters for different topics") {
174 cut.notifyMessageSent(routedMessage(topicName1))
175 cut.notifyMessageSent(routedMessage(topicName2))
176 cut.notifyMessageSent(routedMessage(topicName2))
178 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
179 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
182 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
183 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
188 on("$PREFIX.messages.processing.time") {
189 val counterName = "$PREFIX.messages.processing.time"
190 val processingTimeMs = 100L
192 it("should update timer") {
194 cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
196 verifyTimer(counterName) { timer ->
197 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
199 verifyAllCountersAreUnchangedBut(
201 "$PREFIX.messages.sent.topic.count",
202 "$PREFIX.messages.sent.count")
207 describe("notifyMessageDropped") {
209 on("$PREFIX.messages.dropped.count counter") {
210 val counterName = "$PREFIX.messages.dropped.count"
211 it("should increment counter") {
212 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
213 cut.notifyMessageDropped(INVALID_MESSAGE)
215 verifyCounter(counterName) {
216 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
218 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
222 on("$PREFIX.messages.dropped.cause.count counter") {
223 val counterName = "$PREFIX.messages.dropped.cause.count"
224 it("should handle counters for different drop reasons") {
225 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
226 cut.notifyMessageDropped(INVALID_MESSAGE)
227 cut.notifyMessageDropped(INVALID_MESSAGE)
229 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
230 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
233 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
234 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
240 describe("processing gauge") {
241 it("should show difference between sent and received messages") {
243 on("positive difference") {
244 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
245 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
246 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
247 cut.notifyMessageSent(routedMessage("perf3gpp"))
248 verifyGauge("messages.processing.count") {
249 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
253 on("zero difference") {
254 cut.notifyMessageReceived(emptyWireProtocolFrame())
255 cut.notifyMessageSent(routedMessage("perf3gpp"))
256 verifyGauge("messages.processing.count") {
257 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
261 on("negative difference") {
262 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
263 cut.notifyMessageSent(routedMessage("fault"))
264 cut.notifyMessageSent(routedMessage("perf3gpp"))
265 verifyGauge("messages.processing.count") {
266 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
272 describe("notifyClientRejected") {
274 on("$PREFIX.clients.rejected.count") {
275 val counterName = "$PREFIX.clients.rejected.count"
276 it("should increment counter for each possible reason") {
277 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
278 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
280 verifyCounter(counterName) {
281 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
283 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
287 on("$PREFIX.clients.rejected.cause.count counter") {
288 val counterName = "$PREFIX.clients.rejected.cause.count"
289 it("should handle counters for different rejection reasons") {
290 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
291 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
292 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
294 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
295 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
298 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
299 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
306 fun routedMessage(topic: String, partition: Int = 0) =
307 vesEvent().let {evt ->
308 RoutedMessage(topic, partition,
309 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
312 fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
313 vesEvent().let {evt ->
314 RoutedMessage(topic, partition,
315 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))