2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
23 import io.micrometer.core.instrument.Counter
24 import io.micrometer.core.instrument.Gauge
25 import io.micrometer.core.instrument.Timer
26 import io.micrometer.core.instrument.search.RequiredSearch
27 import io.micrometer.prometheus.PrometheusConfig
28 import io.micrometer.prometheus.PrometheusMeterRegistry
29 import org.assertj.core.api.Assertions.assertThat
30 import org.assertj.core.data.Percentage
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
37 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
38 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
39 import org.onap.dcae.collectors.veshv.model.RoutedMessage
40 import org.onap.dcae.collectors.veshv.model.VesMessage
41 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
43 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
44 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
45 import java.time.Instant
46 import java.time.temporal.Temporal
47 import java.util.concurrent.TimeUnit
50 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
53 object MicrometerMetricsTest : Spek({
54 val doublePrecision = Percentage.withPercentage(0.5)
55 lateinit var registry: PrometheusMeterRegistry
56 lateinit var cut: MicrometerMetrics
59 registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
60 cut = MicrometerMetrics(registry)
63 fun registrySearch() = RequiredSearch.`in`(registry)
65 fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
69 { ex -> assertThat(ex).doesNotThrowAnyException() },
73 fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
74 verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
76 fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
77 verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
79 fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
80 verifyMeter(search, RequiredSearch::counter, verifier)
82 fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
83 verifyCounter(registrySearch().name(name), verifier)
85 fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
87 .filter { it.id.name.startsWith(PREFIX) }
88 .filter { it is Counter }
89 .map { it as Counter }
90 .filterNot { it.id.name in changedCounters }
92 assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
96 describe("notifyBytesReceived") {
98 on("$PREFIX.data.received.bytes counter") {
99 val counterName = "$PREFIX.data.received.bytes"
101 it("should increment counter") {
103 cut.notifyBytesReceived(bytes)
105 verifyCounter(counterName) {
106 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
110 it("should leave all other counters unchanged") {
111 cut.notifyBytesReceived(128)
112 verifyAllCountersAreUnchangedBut(counterName)
117 describe("notifyMessageReceived") {
118 on("$PREFIX.messages.received.count counter") {
119 val counterName = "$PREFIX.messages.received.count"
121 it("should increment counter") {
122 cut.notifyMessageReceived(emptyWireProtocolFrame())
124 verifyCounter(counterName) {
125 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
130 on("$PREFIX.messages.received.bytes counter") {
131 val counterName = "$PREFIX.messages.received.bytes"
133 it("should increment counter") {
135 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
137 verifyCounter(counterName) {
138 assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
143 it("should leave all other counters unchanged") {
144 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
145 verifyAllCountersAreUnchangedBut(
146 "$PREFIX.messages.received.count",
147 "$PREFIX.messages.received.bytes"
152 describe("notifyMessageSent") {
153 val topicName1 = "PERF3GPP"
154 val topicName2 = "CALLTRACE"
156 on("$PREFIX.messages.sent.count.total counter") {
157 val counterName = "$PREFIX.messages.sent.count.total"
159 it("should increment counter") {
160 cut.notifyMessageSent(routedMessage(topicName1))
162 verifyCounter(counterName) {
163 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
165 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic")
169 on("$PREFIX.messages.sent.topic.count counter") {
170 val counterName = "$PREFIX.messages.sent.count.topic"
171 it("should handle counters for different topics") {
172 cut.notifyMessageSent(routedMessage(topicName1))
173 cut.notifyMessageSent(routedMessage(topicName2))
174 cut.notifyMessageSent(routedMessage(topicName2))
176 verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
177 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
180 verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) {
181 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
186 on("$PREFIX.messages.processing.time") {
187 val counterName = "$PREFIX.messages.processing.time"
188 val processingTimeMs = 100L
190 it("should update timer") {
192 cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
194 verifyTimer(counterName) { timer ->
195 assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
197 verifyAllCountersAreUnchangedBut(
199 "$PREFIX.messages.sent.count.topic",
200 "$PREFIX.messages.sent.count.total")
205 describe("notifyMessageDropped") {
207 on("$PREFIX.messages.dropped.count.total counter") {
208 val counterName = "$PREFIX.messages.dropped.count.total"
209 it("should increment counter") {
210 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
211 cut.notifyMessageDropped(INVALID_MESSAGE)
213 verifyCounter(counterName) {
214 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
216 verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause")
220 on("$PREFIX.messages.dropped.count.cause counter") {
221 val counterName = "$PREFIX.messages.dropped.count.cause"
222 it("should handle counters for different drop reasons") {
223 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
224 cut.notifyMessageDropped(INVALID_MESSAGE)
225 cut.notifyMessageDropped(INVALID_MESSAGE)
227 verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
228 assertThat(it.count()).isCloseTo(1.0, doublePrecision)
231 verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) {
232 assertThat(it.count()).isCloseTo(2.0, doublePrecision)
238 describe("processing gauge") {
239 it("should show difference between sent and received messages") {
241 on("positive difference") {
242 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
243 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
244 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
245 cut.notifyMessageSent(routedMessage("perf3gpp"))
246 verifyGauge("messages.processing.count") {
247 assertThat(it.value()).isCloseTo(2.0, doublePrecision)
251 on("zero difference") {
252 cut.notifyMessageReceived(emptyWireProtocolFrame())
253 cut.notifyMessageSent(routedMessage("perf3gpp"))
254 verifyGauge("messages.processing.count") {
255 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
259 on("negative difference") {
260 cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
261 cut.notifyMessageSent(routedMessage("fault"))
262 cut.notifyMessageSent(routedMessage("perf3gpp"))
263 verifyGauge("messages.processing.count") {
264 assertThat(it.value()).isCloseTo(0.0, doublePrecision)
272 fun routedMessage(topic: String, partition: Int = 0) =
273 vesEvent().let {evt ->
274 RoutedMessage(topic, partition,
275 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
278 fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
279 vesEvent().let {evt ->
280 RoutedMessage(topic, partition,
281 VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))