Metric: Message latency 18/74718/5
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Mon, 17 Dec 2018 12:22:52 +0000 (13:22 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 18 Dec 2018 10:00:12 +0000 (11:00 +0100)
Defined as a difference between now and vesHeader.lastEpochTime.

Change-Id: I4aa97e8efc13cb0039fde38b4fd2aa6411c7b89a
Issue-ID: DCAEGEN2-1036
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt [new file with mode: 0644]
sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt [new file with mode: 0644]

index f060426..d35e17d 100644 (file)
@@ -30,9 +30,10 @@ import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
 import java.time.Duration
 import java.time.Instant
 
@@ -49,6 +50,9 @@ class MicrometerMetrics internal constructor(
     private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
     private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
 
+    private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
+    private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME))
+
     private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
     private val sentToTopicCount = { topic: String ->
         registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
@@ -59,8 +63,6 @@ class MicrometerMetrics internal constructor(
         registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause)
     }.memoize<String, Counter>()
 
-    private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
-
     private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT))
     private val clientsRejectedCauseCount = { cause: String ->
         registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause)
@@ -90,9 +92,12 @@ class MicrometerMetrics internal constructor(
     }
 
     override fun notifyMessageSent(msg: RoutedMessage) {
+        val now = Instant.now()
         sentCountTotal.increment()
         sentToTopicCount(msg.topic).increment()
-        processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
+
+        processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
+        totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
     }
 
     override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -121,6 +126,7 @@ class MicrometerMetrics internal constructor(
         internal const val TOPIC = "topic"
         internal const val DROPPED = "dropped"
         internal const val TIME = "time"
-        fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
+        internal const val LATENCY = "latency"
+        internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
     }
 }
index 2ecdb26..71fc8f7 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
 import arrow.core.Try
 import io.micrometer.core.instrument.Counter
 import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Meter
 import io.micrometer.core.instrument.Timer
 import io.micrometer.core.instrument.search.RequiredSearch
 import io.micrometer.prometheus.PrometheusConfig
@@ -34,10 +35,10 @@ import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -47,6 +48,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSi
 import java.time.Instant
 import java.time.temporal.Temporal
 import java.util.concurrent.TimeUnit
+import kotlin.reflect.KClass
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -54,6 +56,7 @@ import java.util.concurrent.TimeUnit
  */
 object MicrometerMetricsTest : Spek({
     val doublePrecision = Percentage.withPercentage(0.5)
+    val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
     lateinit var registry: PrometheusMeterRegistry
     lateinit var cut: MicrometerMetrics
 
@@ -84,15 +87,25 @@ object MicrometerMetricsTest : Spek({
     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
             verifyCounter(registrySearch(name), verifier)
 
-    fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
-        registry.meters
-                .filter { it.id.name.startsWith(PREFIX) }
-                .filter { it is Counter }
-                .map { it as Counter }
-                .filterNot { it.id.name in changedCounters }
-                .forEach {
-                    assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
-                }
+    fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
+        fun <T : Meter> verifyAllMetersAreUnchangedBut(
+                clazz: KClass<T>,
+                changedCounters: Collection<String>,
+                valueOf: (T) -> Double) {
+            registry.meters
+                    .filter { it.id.name.startsWith(PREFIX) }
+                    .filter { clazz.isInstance(it) }
+                    .map { it as T }
+                    .filterNot { it.id.name in changedCounters }
+                    .forEach {
+                        assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
+                    }
+        }
+
+        setOf(*changedMeters).let { changedMetersCollection ->
+            verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
+            verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
+        }
     }
 
     describe("notifyBytesReceived") {
@@ -111,7 +124,7 @@ object MicrometerMetricsTest : Spek({
 
             it("should leave all other counters unchanged") {
                 cut.notifyBytesReceived(128)
-                verifyAllCountersAreUnchangedBut(counterName)
+                verifyCountersAndTimersAreUnchangedBut(counterName)
             }
         }
     }
@@ -144,7 +157,7 @@ object MicrometerMetricsTest : Spek({
 
         it("should leave all other counters unchanged") {
             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
-            verifyAllCountersAreUnchangedBut(
+            verifyCountersAndTimersAreUnchangedBut(
                     "$PREFIX.messages.received.count",
                     "$PREFIX.messages.received.bytes"
             )
@@ -164,7 +177,11 @@ object MicrometerMetricsTest : Spek({
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
                 }
-                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
+                verifyCountersAndTimersAreUnchangedBut(
+                        counterName,
+                        "$PREFIX.messages.sent.topic.count",
+                        "$PREFIX.messages.processing.time",
+                        "$PREFIX.messages.latency.time")
             }
         }
 
@@ -191,17 +208,41 @@ object MicrometerMetricsTest : Spek({
 
             it("should update timer") {
 
-                cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
+                cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
 
                 verifyTimer(counterName) { timer ->
                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
                 }
-                verifyAllCountersAreUnchangedBut(
+                verifyCountersAndTimersAreUnchangedBut(
                         counterName,
                         "$PREFIX.messages.sent.topic.count",
-                        "$PREFIX.messages.sent.count")
+                        "$PREFIX.messages.sent.count",
+                        "$PREFIX.messages.latency.time")
             }
         }
+
+        on("$PREFIX.messages.latency.time") {
+            val counterName = "$PREFIX.messages.latency.time"
+            val latencyMs = 1666L
+
+            it("should update timer") {
+
+                cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
+
+                verifyTimer(counterName) { timer ->
+                    assertThat(timer.mean(TimeUnit.MILLISECONDS))
+                            .isGreaterThanOrEqualTo(latencyMs.toDouble())
+                            .isLessThanOrEqualTo(latencyMs + 10000.0)
+
+                }
+                verifyCountersAndTimersAreUnchangedBut(
+                        counterName,
+                        "$PREFIX.messages.sent.topic.count",
+                        "$PREFIX.messages.sent.count",
+                        "$PREFIX.messages.processing.time")
+            }
+        }
+
     }
 
     describe("notifyMessageDropped") {
@@ -215,7 +256,7 @@ object MicrometerMetricsTest : Spek({
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
                 }
-                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
+                verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
             }
         }
 
@@ -280,7 +321,7 @@ object MicrometerMetricsTest : Spek({
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
                 }
-                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
+                verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
             }
         }
 
@@ -304,13 +345,23 @@ object MicrometerMetricsTest : Spek({
 })
 
 fun routedMessage(topic: String, partition: Int = 0) =
-        vesEvent().let {evt ->
+        vesEvent().let { evt ->
             RoutedMessage(topic, partition,
                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
         }
 
-fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
-        vesEvent().let {evt ->
+fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
+        vesEvent().let { evt ->
             RoutedMessage(topic, partition,
                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
         }
+
+fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
+        vesEvent().let { evt ->
+            val builder = evt.toBuilder()
+            builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
+            builder.build()
+        }.let { evt ->
+            RoutedMessage(topic, partition,
+                    VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
+        }
index a845689..ed0cab6 100644 (file)
@@ -61,8 +61,8 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP,
                 .setEventId(id)
                 .setEventName("sample-event-name")
                 .setEventType("sample-event-type")
-                .setStartEpochMicrosec(120034455)
-                .setLastEpochMicrosec(120034455)
+                .setStartEpochMicrosec(100000000)
+                .setLastEpochMicrosec(100000005)
                 .setNfNamingCode("sample-nf-naming-code")
                 .setNfcNamingCode("sample-nfc-naming-code")
                 .setNfVendorName("vendor-name")
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/time.kt
new file mode 100644 (file)
index 0000000..c07da67
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import java.time.Instant
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object TimeUtils {
+    fun epochMicroToInstant(epochMicroseconds: Long): Instant {
+        val seconds = epochMicroseconds / MICROSECONDS_IN_SECOND
+        val nanos = (epochMicroseconds - seconds * MICROSECONDS_IN_SECOND) * NANOSECONDS_IN_MICROSECOND
+        return Instant.ofEpochSecond(seconds, nanos)
+    }
+
+    private const val MICROSECONDS_IN_SECOND = 1_000_000L
+    private const val NANOSECONDS_IN_MICROSECOND = 1_000L
+}
diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/time_test.kt
new file mode 100644 (file)
index 0000000..3ec74ab
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import java.time.Instant
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object TimeTest : Spek({
+    describe("epochMicrosecond to Instant converter") {
+        it("should convert") {
+            val epochSeconds = 1545048422L
+            val nanoAdjustment = 666999000L
+            val epochMicros = 1545048422666999L
+
+            val result = TimeUtils.epochMicroToInstant(epochMicros)
+
+            assertThat(result).isEqualTo(Instant.ofEpochSecond(epochSeconds, nanoAdjustment))
+        }
+    }
+})