<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
- <version>1.0.8</version>
+ <version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<version>${project.parent.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
</dependency>
- <dependency>
- <groupId>com.nhaarman.mockitokotlin2</groupId>
- <artifactId>mockito-kotlin</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.micrometer</groupId>
- <artifactId>micrometer-registry-prometheus</artifactId>
- </dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
internal interface Metrics {
- fun notifyOffsetChanged(size: Long)
-}
+ fun notifyOffsetChanged(offset: Long)
+ fun notifyMessageTravelTime(messageSentTimeMicros: Long)
+}
\ No newline at end of file
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+import io.micrometer.core.instrument.Timer
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.onap.dcae.collectors.veshv.utils.TimeUtils
import reactor.core.publisher.Mono
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicLong
internal class MicrometerMetrics constructor(
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
- override fun notifyOffsetChanged(size: Long) {
- // TODO implementation here
- }
+
+ private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
+ private val travelTime = Timer.builder(name("travel.time"))
+ .publishPercentileHistogram(true)
+ .register(registry)
fun lastStatus(): Mono<String> = Mono.fromCallable {
registry.scrape()
}
+ override fun notifyOffsetChanged(offset: Long) {
+ currentOffset.lazySet(offset)
+ }
+
+ override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
+ travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
+ }
+
companion object {
val INSTANCE by lazy { MicrometerMetrics() }
+
+ private const val PREFIX = "hv-kafka-consumer"
+ private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.kafkaconsumer
-
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import kotlin.test.assertTrue
-
-object SampleTest : Spek({
- describe("sample test") {
- assertTrue(true)
- }
-})
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.data.Percentage
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
+import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+object MicrometerMetricsTest : Spek({
+ val PREFIX = "hv-kafka-consumer"
+ val doublePrecision = Percentage.withPercentage(0.5)
+ lateinit var registry: PrometheusMeterRegistry
+ lateinit var cut: MicrometerMetrics
+
+ beforeEachTest {
+ registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+ cut = MicrometerMetrics(registry)
+ }
+
+ describe("Timers") {
+ val arbitraryMessageTravelTime = 100L
+ val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000
+ val timerName = "$PREFIX.travel.time"
+
+ on("notifyMessageTravelTime") {
+ it("should update timer $timerName") {
+
+ val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000
+ cut.notifyMessageTravelTime(messageSentTimeMicros)
+ val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000
+
+ registry.verifyTimer(timerName) { timer ->
+ val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble()
+ val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble()
+ assertThat(timer.totalTime(TimeUnit.MICROSECONDS))
+ .isLessThanOrEqualTo(travelTimeAfterNotify)
+ .isGreaterThanOrEqualTo(travelTimeBeforeNotify)
+
+ }
+ }
+ }
+ }
+
+ describe("Gauges") {
+ val gaugeName = "$PREFIX.consumer.offset"
+
+ on("notifyOffsetChanged") {
+ val offset = 966L
+
+ it("should update $gaugeName") {
+ cut.notifyOffsetChanged(offset)
+
+ registry.verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
+ }
+ }
+ }
+ }
+})
package org.onap.dcae.collectors.veshv.main
import arrow.core.Option
-import arrow.core.Try
import io.micrometer.core.instrument.Counter
-import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.Meter
+import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
-import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
+import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
+import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer
import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import org.onap.ves.VesEventOuterClass
cut = MicrometerMetrics(registry)
}
- fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
-
- fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
- Try {
- map(search)
- }.fold(
- { ex -> assertThat(ex).doesNotThrowAnyException() },
- verifier
- )
-
- fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
- verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
-
- fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
- verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
-
- fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
- verifyMeter(search, RequiredSearch::counter, verifier)
-
- fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
- verifyCounter(registrySearch(name), verifier)
-
-
fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
fun <T : Meter> verifyAllMetersAreUnchangedBut(
clazz: KClass<T>,
val bytes = 128
cut.notifyBytesReceived(bytes)
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
}
}
it("should increment counter") {
cut.notifyMessageReceived(emptyWireProtocolFrame())
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
}
val bytes = 888
cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
}
}
it("should increment counter") {
cut.notifyMessageSent(routedMessage(topicName1))
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(
cut.notifyMessageSent(routedMessage(topicName2))
cut.notifyMessageSent(routedMessage(topicName2))
- verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
+ registry.verifyCounter(counterName, Tags.of("topic", topicName1)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
+ registry.verifyCounter(counterName, Tags.of("topic", topicName2)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
- verifyTimer(counterName) { timer ->
+ registry.verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
}
verifyCountersAndTimersAreUnchangedBut(
cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
- verifyTimer(counterName) { timer ->
+ registry.verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS))
.isGreaterThanOrEqualTo(latencyMs.toDouble())
.isLessThanOrEqualTo(latencyMs + 10000.0)
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
cut.notifyMessageDropped(INVALID_MESSAGE)
cut.notifyMessageDropped(INVALID_MESSAGE)
- verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
cut.notifyClientConnected()
cut.notifyClientConnected()
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName)
cut.notifyClientDisconnected()
cut.notifyClientDisconnected()
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName)
cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
- verifyCounter(counterName) {
+ registry.verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
- verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
}
- verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
+ registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
}
cut.notifyClientConnected()
cut.notifyClientDisconnected()
- verifyGauge(gaugeName) {
+ registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(2.0, doublePrecision)
}
}
cut.notifyClientDisconnected()
cut.notifyClientDisconnected()
- verifyGauge(gaugeName) {
+ registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
it("should calculate negative difference between connected and disconnected clients") {
cut.notifyClientDisconnected()
- verifyGauge(gaugeName) {
+ registry.verifyGauge(gaugeName) {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
<artifactId>logback-classic</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-prometheus</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Try
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Tags
+import io.micrometer.core.instrument.Timer
+import io.micrometer.core.instrument.search.RequiredSearch
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.assertj.core.api.Assertions
+
+
+fun <T> PrometheusMeterRegistry.verifyGauge(name: String, verifier: (Gauge) -> T) =
+ verifyMeter(findMeter(name), RequiredSearch::gauge, verifier)
+
+fun <T> PrometheusMeterRegistry.verifyTimer(name: String, verifier: (Timer) -> T) =
+ verifyMeter(findMeter(name), RequiredSearch::timer, verifier)
+
+fun <T> PrometheusMeterRegistry.verifyCounter(name: String, verifier: (Counter) -> T) =
+ verifyCounter(findMeter(name), verifier)
+
+fun <T> PrometheusMeterRegistry.verifyCounter(name: String, tags: Tags, verifier: (Counter) -> T) =
+ verifyCounter(findMeter(name).tags(tags), verifier)
+
+private fun PrometheusMeterRegistry.findMeter(meterName: String) = RequiredSearch.`in`(this).name(meterName)
+
+private fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
+ verifyMeter(search, RequiredSearch::counter, verifier)
+
+private inline fun <M, T> verifyMeter(search: RequiredSearch,
+ map: (RequiredSearch) -> M,
+ verifier: (M) -> T) =
+ Try { map(search) }.fold(
+ { ex -> Assertions.assertThat(ex).doesNotThrowAnyException() },
+ verifier
+ )
\ No newline at end of file