Add of message travel time metric 38/106338/2
authorkjaniak <kornel.janiak@nokia.com>
Tue, 21 Apr 2020 10:44:53 +0000 (12:44 +0200)
committerKornel Janiak <kornel.janiak@nokia.com>
Wed, 22 Apr 2020 11:19:54 +0000 (11:19 +0000)
Message travel time: Producer -> HV-VES input introduced.
Tests for new metric added.

Change-Id: I36347ff53abb3f274e4358af26db49fe8bac95ed
Issue-ID: DCAEGEN2-1576
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt

index 41993e6..3fe5fd5 100644 (file)
@@ -43,6 +43,7 @@ interface SinkFactory : Closeable {
 interface Metrics {
     fun notifyBytesReceived(size: Int)
     fun notifyMessageReceived(msg: WireFrameMessage)
+    fun notifyMessageReceived(msg: VesMessage)
     fun notifyMessageReadyForRouting(msg: VesMessage)
     fun notifyMessageSent(msg: RoutedMessage)
     fun notifyMessageDropped(cause: MessageDropCause)
index f0d1465..98b3ce9 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -82,7 +82,7 @@ internal class HvVesCollector(
                         .filterFailedWithLog(logger, clientContext::fullMdc,
                                 { "Ves event header decoded successfully" },
                                 { "Failed to decode ves event header, reason: ${it.message}" })
-            }
+            }.doOnNext(metrics::notifyMessageReceived)
 
     private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
             .filterFailedWithLog {
index 3b01d13..1255596 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.TimeUtils
 import java.time.Duration
 import java.time.Instant
 import kotlin.test.fail
@@ -40,6 +41,7 @@ class FakeMetrics : Metrics {
     var messagesDroppedCount: Int = 0; private set
     var lastProcessingTimeMicros: Double = -1.0; private set
     var lastProcessingTimeWithoutRoutingMicros: Double = -1.0; private set
+    var lastToCollectorTravelTime: Double = -1.0; private set
     var messagesSentCount: Int = 0; private set
     var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set
 
@@ -54,6 +56,11 @@ class FakeMetrics : Metrics {
         messageBytesReceived += msg.payloadSize
     }
 
+    override fun notifyMessageReceived(msg: VesMessage) {
+        lastToCollectorTravelTime = Duration.between(TimeUtils.epochMicroToInstant(msg.header.lastEpochMicrosec),
+                Instant.now()).toNanos() / 1000.0
+    }
+
     override fun notifyMessageReadyForRouting(msg: VesMessage) {
         lastProcessingTimeWithoutRoutingMicros = Duration.between(msg.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
     }
index 2f3470a..e0d99fc 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,11 +30,11 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
 import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
 import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
 import java.time.Duration
 import java.time.Instant
@@ -54,6 +54,10 @@ class MicrometerMetrics internal constructor(
     private val totalConnections = registry.counter(name(CONNECTIONS))
     private val disconnections = registry.counter(name(DISCONNECTIONS))
 
+    private val travelTimeToCollector = Timer.builder(name(MESSAGES, TO, COLLECTOR, TRAVEL, TIME))
+            .maximumExpectedValue(MAX_BUCKET_DURATION)
+            .publishPercentileHistogram(true)
+            .register(registry)
     private val processingTime = Timer.builder(name(MESSAGES, PROCESSING, TIME))
             .maximumExpectedValue(MAX_BUCKET_DURATION)
             .publishPercentileHistogram(true)
@@ -108,6 +112,12 @@ class MicrometerMetrics internal constructor(
         receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble())
     }
 
+    override fun notifyMessageReceived(msg: VesMessage) {
+        travelTimeToCollector.record(
+                Duration.between(epochMicroToInstant(msg.header.lastEpochMicrosec), msg.wtpFrame.receivedAt)
+        )
+    }
+
     override fun notifyMessageSent(msg: RoutedMessage) {
         val now = Instant.now()
         sentMessages.increment()
@@ -157,6 +167,9 @@ class MicrometerMetrics internal constructor(
         internal const val PAYLOAD = "payload"
         internal const val WITHOUT = "without"
         internal const val ROUTING = "routing"
+        internal const val TRAVEL = "travel"
+        internal const val TO = "to"
+        internal const val COLLECTOR = "collector"
         internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L)
         internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
     }
index a3471d4..efd353e 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -33,15 +33,15 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
@@ -206,6 +206,23 @@ object MicrometerMetricsTest : Spek({
             }
         }
 
+        on("$PREFIX.messages.to.collector.travel.time") {
+            val counterName = "$PREFIX.messages.to.collector.travel.time"
+            val toCollectorTravelTimeMs = 100L
+
+            it("should update timer") {
+                val now = Instant.now()
+                val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
+                cut.notifyMessageReceived(vesMessage)
+
+                registry.verifyTimer(counterName) { timer ->
+                    assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
+                }
+
+                verifyCountersAndTimersAreUnchangedBut(counterName)
+            }
+        }
+
         on("$PREFIX.messages.processing.time.without.routing") {
             val counterName = "$PREFIX.messages.processing.time.without.routing"
             val processingTimeMs = 100L
@@ -217,11 +234,8 @@ object MicrometerMetricsTest : Spek({
                 registry.verifyTimer(counterName) { timer ->
                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
                 }
-                verifyCountersAndTimersAreUnchangedBut(
-                        counterName,
-                        "$PREFIX.messages.sent.topic",
-                        "$PREFIX.messages.sent",
-                        "$PREFIX.messages.latency")
+
+                verifyCountersAndTimersAreUnchangedBut(counterName)
             }
         }
 
@@ -384,6 +398,13 @@ object MicrometerMetricsTest : Spek({
     }
 })
 
+private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
+    val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
+    val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
+    return VesMessage(commonHeader,
+            wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
+}
+
 private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
     val commonHeader = commonHeader(domain)
     return VesMessage(commonHeader,