Add metric for processing without routing 37/97737/2
authorkjaniak <kornel.janiak@nokia.com>
Wed, 30 Oct 2019 13:19:27 +0000 (14:19 +0100)
committerkjaniak <kornel.janiak@nokia.com>
Wed, 30 Oct 2019 13:43:00 +0000 (14:43 +0100)
Performance tests need better check of processing time in HV-VES.

Change-Id: I0792c4ac014a7b8907ef314a3fd9981776dc0b35
Issue-ID: DCAEGEN2-1890
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt

index 28b2820..41993e6 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.boundary
 
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
@@ -42,6 +43,7 @@ interface SinkFactory : Closeable {
 interface Metrics {
     fun notifyBytesReceived(size: Int)
     fun notifyMessageReceived(msg: WireFrameMessage)
+    fun notifyMessageReadyForRouting(msg: VesMessage)
     fun notifyMessageSent(msg: RoutedMessage)
     fun notifyMessageDropped(cause: MessageDropCause)
     fun notifyClientDisconnected()
index ac7c391..f0d1465 100644 (file)
@@ -92,6 +92,7 @@ internal class HvVesCollector(
             }
 
     private fun route(flux: Flux<VesMessage>) = flux
+            .doOnNext(metrics::notifyMessageReadyForRouting)
             .flatMap(router::route)
             .doOnNext(this::updateSinkMetrics)
 
index a450b79..3b01d13 100644 (file)
@@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
 import java.time.Duration
 import java.time.Instant
 import kotlin.test.fail
@@ -38,6 +39,7 @@ class FakeMetrics : Metrics {
     var messageBytesReceived: Int = 0; private set
     var messagesDroppedCount: Int = 0; private set
     var lastProcessingTimeMicros: Double = -1.0; private set
+    var lastProcessingTimeWithoutRoutingMicros: Double = -1.0; private set
     var messagesSentCount: Int = 0; private set
     var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set
 
@@ -52,6 +54,10 @@ class FakeMetrics : Metrics {
         messageBytesReceived += msg.payloadSize
     }
 
+    override fun notifyMessageReadyForRouting(msg: VesMessage) {
+        lastProcessingTimeWithoutRoutingMicros = Duration.between(msg.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
+    }
+
     override fun notifyMessageSent(msg: RoutedMessage) {
         messagesSentCount++
         messagesSentToTopic.compute(msg.targetTopic) { k, _ ->
index fa52ac2..9d417a2 100644 (file)
@@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
 import java.time.Duration
 import java.time.Instant
@@ -46,7 +47,6 @@ import java.time.Instant
 class MicrometerMetrics internal constructor(
         private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
 ) : Metrics {
-
     private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
     private val receivedMessages = registry.counter(name(MESSAGES, RECEIVED))
     private val receivedMessagesPayloadBytes = registry.counter(name(MESSAGES, RECEIVED, PAYLOAD, BYTES))
@@ -58,6 +58,9 @@ class MicrometerMetrics internal constructor(
             .maximumExpectedValue(MAX_BUCKET_DURATION)
             .publishPercentileHistogram(true)
             .register(registry)
+    private val processingTimeWithoutRouting = Timer.builder(name(MESSAGES, PROCESSING, TIME, WITHOUT, ROUTING))
+            .publishPercentileHistogram(true)
+            .register(registry)
     private val totalLatency = Timer.builder(name(MESSAGES, LATENCY))
             .maximumExpectedValue(MAX_BUCKET_DURATION)
             .publishPercentileHistogram(true)
@@ -67,12 +70,10 @@ class MicrometerMetrics internal constructor(
     private val sentMessagesByTopic = { topic: String ->
         registry.counter(name(MESSAGES, SENT, TOPIC), TOPIC, topic)
     }.memoize<String, Counter>()
-
     private val droppedMessages = registry.counter(name(MESSAGES, DROPPED))
     private val messagesDroppedByCause = { cause: String ->
         registry.counter(name(MESSAGES, DROPPED, CAUSE), CAUSE, cause)
     }.memoize<String, Counter>()
-
     private val clientsRejected = registry.counter(name(CLIENTS, REJECTED))
     private val clientsRejectedByCause = { cause: String ->
         registry.counter(name(CLIENTS, REJECTED, CAUSE), CAUSE, cause)
@@ -97,6 +98,10 @@ class MicrometerMetrics internal constructor(
         receivedBytes.increment(size.toDouble())
     }
 
+    override fun notifyMessageReadyForRouting(msg: VesMessage) {
+        processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, Instant.now()))
+    }
+
     override fun notifyMessageReceived(msg: WireFrameMessage) {
         receivedMessages.increment()
         receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble())
@@ -150,6 +155,8 @@ class MicrometerMetrics internal constructor(
         internal const val LATENCY = "latency"
         internal const val PAYLOAD = "payload"
         internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L)
+        internal const val WITHOUT = "without"
+        internal const val ROUTING = "routing"
         internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
     }
 }
index 66f3a5f..a3471d4 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.main
 
 import arrow.core.Option
+import com.google.protobuf.ByteString
 import io.micrometer.core.instrument.Counter
 import io.micrometer.core.instrument.Meter
 import io.micrometer.core.instrument.Tags
@@ -39,7 +40,9 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EX
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
 import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
 import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
 import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
@@ -203,6 +206,25 @@ object MicrometerMetricsTest : Spek({
             }
         }
 
+        on("$PREFIX.messages.processing.time.without.routing") {
+            val counterName = "$PREFIX.messages.processing.time.without.routing"
+            val processingTimeMs = 100L
+
+            it("should update timer") {
+
+                cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
+
+                registry.verifyTimer(counterName) { timer ->
+                    assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
+                }
+                verifyCountersAndTimersAreUnchangedBut(
+                        counterName,
+                        "$PREFIX.messages.sent.topic",
+                        "$PREFIX.messages.sent",
+                        "$PREFIX.messages.latency")
+            }
+        }
+
         on("$PREFIX.messages.latency") {
             val counterName = "$PREFIX.messages.latency"
             val latencyMs = 1666L
@@ -362,13 +384,19 @@ object MicrometerMetricsTest : Spek({
     }
 })
 
-fun routedMessage(topic: String, partition: Int = 0) =
+private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
+    val commonHeader = commonHeader(domain)
+    return VesMessage(commonHeader,
+            wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
+}
+
+private fun routedMessage(topic: String, partition: Int = 0) =
         vesEvent().run { toRoutedMessage(topic, partition) }
 
-fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
+private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
         vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
 
-fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
+private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
         vesEvent().run {
             val builder = toBuilder()
             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
index ba60d1b..3013e90 100644 (file)
@@ -30,6 +30,8 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.ves.VesEventOuterClass
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
+import java.time.Instant
+import java.time.temporal.Temporal
 import java.util.UUID.randomUUID
 
 fun vesEvent(domain: VesEventDomain = PERF3GPP,
@@ -53,7 +55,7 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP,
                  vesEventListenerVersion: String = "7.0.2",
                  priority: Priority = Priority.NORMAL,
                  lastEpochMicrosec: Long = 100000005
-                 ): CommonEventHeader =
+): CommonEventHeader =
         CommonEventHeader.newBuilder()
                 .setVersion("sample-version")
                 .setDomain(domain.domainName)
@@ -86,14 +88,17 @@ fun wireProtocolFrameWithPayloadSize(size: Int): WireFrameMessage = WireFrameMes
         payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
 )
 
-fun wireProtocolFrame(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): WireFrameMessage =
+fun wireProtocolFrame(commonHeader: CommonEventHeader,
+                      eventFields: ByteString = ByteString.EMPTY,
+                      receivedAt: Temporal = Instant.now()): WireFrameMessage =
         vesEventBytes(commonHeader, eventFields).let { payload ->
             WireFrameMessage(
                     payload = payload,
                     versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
                     versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR,
                     payloadSize = payload.size(),
-                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
+                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                    receivedAt = receivedAt
             )
         }