Metric: Processing time 55/74655/8
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 14 Dec 2018 11:05:47 +0000 (12:05 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Mon, 17 Dec 2018 14:06:29 +0000 (15:06 +0100)
Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events.

Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf
Issue-ID: DCAEGEN2-1036
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
20 files changed:
development/docker-compose.yml
development/prometheus.yml [new file with mode: 0644]
development/start-simulation.sh [new file with mode: 0755]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt

index 2b903a8..a64c62d 100644 (file)
@@ -111,3 +111,13 @@ services:
               "--kafka-topics", "HV_VES_PERF3GPP"]
     depends_on:
     - message-router-kafka
+
+  #
+  # Monitoring
+  #
+  prometheus:
+    image: prom/prometheus
+    ports:
+      - "9090:9090"
+    volumes:
+      - ./prometheus.yml:/etc/prometheus/prometheus.yml
diff --git a/development/prometheus.yml b/development/prometheus.yml
new file mode 100644 (file)
index 0000000..201c8f9
--- /dev/null
@@ -0,0 +1,14 @@
+global:
+  scrape_interval: 5s
+  external_labels:
+    monitor: 'my-monitor'
+
+scrape_configs:
+  - job_name: 'prometheus'
+    static_configs:
+      - targets: ['localhost:9090']
+
+  - job_name: 'ves-hv-collector'
+    metrics_path: '/monitoring/prometheus'
+    static_configs:
+      - targets: ['ves-hv-collector:6060']
diff --git a/development/start-simulation.sh b/development/start-simulation.sh
new file mode 100755 (executable)
index 0000000..70e4aae
--- /dev/null
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+# TODO: Merge this file with bin/xnf-simulation.sh
+
+currentTimeMicros=$((`date +%s%N`/1000))
+
+curl --header 'Content-Type: application/json' --request POST \
+    --data '[
+             {
+               "commonEventHeader": {
+                 "version": "sample-version",
+                 "domain": "perf3gpp",
+                 "sequence": 1,
+                 "priority": 1,
+                 "eventId": "sample-event-id",
+                 "eventName": "sample-event-name",
+                 "eventType": "sample-event-type",
+                 "startEpochMicrosec": 1545049703000000,
+                 "lastEpochMicrosec":  '$currentTimeMicros',
+                 "nfNamingCode": "sample-nf-naming-code",
+                 "nfcNamingCode": "sample-nfc-naming-code",
+                 "reportingEntityId": "sample-reporting-entity-id",
+                 "reportingEntityName": "sample-reporting-entity-name",
+                 "sourceId": "sample-source-id",
+                 "sourceName": "sample-source-name",
+                 "vesEventListenerVersion": "7.2"
+               },
+               "messageType": "VALID",
+               "messagesAmount": 1
+             }
+           ]' \
+    http://localhost:6062/simulator/async
index 3f69c08..1334738 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
@@ -31,8 +32,8 @@ interface Sink {
 
 interface Metrics {
     fun notifyBytesReceived(size: Int)
-    fun notifyMessageReceived(size: Int)
-    fun notifyMessageSent(topic: String)
+    fun notifyMessageReceived(msg: WireFrameMessage)
+    fun notifyMessageSent(msg: RoutedMessage)
     fun notifyMessageDropped(cause: MessageDropCause)
 }
 
index c670e1d..ee499e1 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Try
 import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventOuterClass.VesEvent
 
@@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent
  */
 internal class VesDecoder {
 
-    fun decode(bytes: ByteData): Try<VesMessage> =
+    fun decode(frame: WireFrameMessage): Try<VesMessage> =
             Try {
-                val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-                VesMessage(decodedHeader, bytes)
+                val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader
+                VesMessage(decodedHeader, frame)
             }
 }
index b29432f..51f894d 100644 (file)
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -68,7 +67,7 @@ internal class VesHvCollector(
     private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
             .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
             .concatMap(wireChunkDecoder::decode)
-            .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+            .doOnNext(metrics::notifyMessageReceived)
 
     private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
             .filterFailedWithLog {
@@ -78,15 +77,14 @@ internal class VesHvCollector(
             }
 
     private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
-            .map(WireFrameMessage::payload)
-            .flatMap(::decodePayload)
-
-    private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
-            .decode(rawPayload)
-            .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
-            .filterFailedWithLog(logger, clientContext::fullMdc,
-                    { "Ves event header decoded successfully" },
-                    { "Failed to decode ves event header, reason: ${it.message}" })
+            .flatMap { frame ->
+                protobufDecoder
+                        .decode(frame)
+                        .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+                        .filterFailedWithLog(logger, clientContext::fullMdc,
+                                { "Ves event header decoded successfully" },
+                                { "Failed to decode ves event header, reason: ${it.message}" })
+            }
 
     private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
             .filterFailedWithLog {
@@ -98,7 +96,7 @@ internal class VesHvCollector(
     private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
             .flatMap(this::findRoute)
             .compose(sink::send)
-            .doOnNext { metrics.notifyMessageSent(it.topic) }
+            .doOnNext(metrics::notifyMessageSent)
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
index ec8593a..14966d9 100644 (file)
@@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider {
 
             private fun logMessage(msg: RoutedMessage) {
                 val msgs = totalMessages.addAndGet(1)
-                val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
+                val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong())
                 val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
                 if (msgs % INFO_LOGGING_FREQ == 0L)
                     logger.info(ctx, logMessageSupplier)
index 7a6ac7c..c92518a 100644 (file)
@@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
  */
 class VesMessageSerializer : Serializer<VesMessage> {
     override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+        // not needed
     }
 
-    override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+    override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray()
 
     override fun close() {
+        // not needed
     }
 }
index 1965d78..d364019 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.model
 
-import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
+data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage)
index f784daa..7d136ef 100644 (file)
@@ -29,7 +29,8 @@ import org.jetbrains.spek.api.dsl.*
 import org.onap.dcae.collectors.veshv.domain.*
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
 import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
 import kotlin.test.assertTrue
 import kotlin.test.fail
@@ -43,7 +44,7 @@ internal object MessageValidatorTest : Spek({
             val commonHeader = commonHeader()
 
             it("should accept message with fully initialized message header") {
-                val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
+                val vesMessage = VesMessage(commonHeader, wireProtocolFrame(commonHeader))
                 with(cut) {
                     assertThat(validateProtobufMessage(vesMessage).isRight())
                         .describedAs("message validation result").isTrue()
@@ -53,7 +54,7 @@ internal object MessageValidatorTest : Spek({
             VesEventDomain.values().forEach { domain ->
                 it("should accept message with $domain domain") {
                     val header = commonHeader(domain)
-                    val vesMessage = VesMessage(header, vesEventBytes(header))
+                    val vesMessage = VesMessage(header, wireProtocolFrame(header))
                     with(cut) {
                         assertThat(validateProtobufMessage(vesMessage).isRight())
                             .describedAs("message validation result").isTrue()
@@ -63,7 +64,7 @@ internal object MessageValidatorTest : Spek({
         }
 
         on("ves hv message bytes") {
-            val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
+            val vesMessage = VesMessage(getDefaultInstance(), emptyWireProtocolFrame())
             it("should not accept message with default header") {
 
                 with(cut) {
@@ -100,7 +101,7 @@ internal object MessageValidatorTest : Spek({
             ).forEach { value, expectedResult ->
                 on("ves hv message including header with priority $value") {
                     val commonEventHeader = commonHeader(priority = value)
-                    val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
+                    val vesMessage = VesMessage(commonEventHeader, wireProtocolFrame(commonEventHeader))
 
                     it("should resolve validation result") {
                         with(cut) {
@@ -121,7 +122,7 @@ internal object MessageValidatorTest : Spek({
                 .setEventId("Sample event Id")
                 .setSourceName("Sample Source")
                 .build()
-            val rawMessageBytes = vesEventBytes(commonHeader)
+            val rawMessageBytes = wireProtocolFrame(commonHeader)
 
             it("should not accept not fully initialized message header") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -148,7 +149,7 @@ internal object MessageValidatorTest : Spek({
 
         on("ves hv message including header.vesEventListenerVersion with non-string major part") {
             val commonHeader = commonHeader(vesEventListenerVersion = "sample-version")
-            val rawMessageBytes = vesEventBytes(commonHeader)
+            val rawMessageBytes = wireProtocolFrame(commonHeader)
 
 
             it("should not accept message header") {
@@ -169,7 +170,7 @@ internal object MessageValidatorTest : Spek({
 
         on("ves hv message including header.vesEventListenerVersion with major part != 7") {
             val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3")
-            val rawMessageBytes = vesEventBytes(commonHeader)
+            val rawMessageBytes = wireProtocolFrame(commonHeader)
 
             it("should not accept message header") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -190,7 +191,7 @@ internal object MessageValidatorTest : Spek({
 
         on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") {
             val commonHeader = commonHeader(vesEventListenerVersion = "7.test")
-            val rawMessageBytes = vesEventBytes(commonHeader)
+            val rawMessageBytes = wireProtocolFrame(commonHeader)
 
             it("should not accept message header") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -237,7 +238,7 @@ internal object MessageValidatorTest : Spek({
                 with(cut) {
                     on("valid message as input") {
                         val commonHeader = commonHeader()
-                        val rawMessageBytes = vesEventBytes(commonHeader)
+                        val rawMessageBytes = wireProtocolFrame(commonHeader)
                         val vesMessage = VesMessage(commonHeader, rawMessageBytes)
 
                         it("should be right") {
@@ -247,7 +248,7 @@ internal object MessageValidatorTest : Spek({
                 }
                 on("invalid message as input") {
                     val commonHeader = newBuilder().build()
-                    val rawMessageBytes = vesEventBytes(commonHeader)
+                    val rawMessageBytes = wireProtocolFrame(commonHeader)
                     val vesMessage = VesMessage(commonHeader, rawMessageBytes)
 
                     it("should be left") {
index e419016..90b850c 100644 (file)
@@ -21,13 +21,11 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.None
 import arrow.core.Some
-import io.netty.buffer.ByteBufAllocator
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
@@ -36,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
 
 
 /**
@@ -61,7 +60,7 @@ object RouterTest : Spek({
         val cut = Router(config, ClientContext())
 
         on("message with existing route (rtpm)") {
-            val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
+            val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
             val result = cut.findDestination(message)
 
             it("should have route available") {
@@ -82,7 +81,7 @@ object RouterTest : Spek({
         }
 
         on("message with existing route (trace)") {
-            val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
+            val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
             val result = cut.findDestination(message)
 
             it("should have route available") {
@@ -103,7 +102,7 @@ object RouterTest : Spek({
         }
 
         on("message with unknown route") {
-            val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
+            val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
             val result = cut.findDestination(message)
 
             it("should not have route available") {
index 605e7a6..74f33a7 100644 (file)
@@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
 import java.nio.charset.Charset
 import kotlin.test.assertTrue
 import kotlin.test.fail
@@ -42,16 +43,16 @@ internal object VesDecoderTest : Spek({
 
         on("ves hv message bytes") {
             val commonHeader = commonHeader(HEARTBEAT)
-            val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
+            val wtpFrame = wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
 
             it("should decode only header and pass it on along with raw message") {
                 val expectedMessage = VesMessage(
                         commonHeader,
-                        rawMessageBytes
+                        wtpFrame
                 )
 
                 assertTrue {
-                    cut.decode(rawMessageBytes).exists {
+                    cut.decode(wtpFrame).exists {
                         it == expectedMessage
                     }
                 }
@@ -60,9 +61,10 @@ internal object VesDecoderTest : Spek({
 
         on("invalid ves hv message bytes") {
             val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
+            val wtpFrame = emptyWireProtocolFrame().copy(payload = rawMessageBytes, payloadSize = rawMessageBytes.size())
 
             it("should throw error") {
-                assertFailedWithError(cut.decode(rawMessageBytes))
+                assertFailedWithError(cut.decode(wtpFrame))
             }
         }
     }
index dd8acf7..9f5c37e 100644 (file)
@@ -33,7 +33,12 @@ import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TO
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.*
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import java.time.Duration
 
 object MetricsSpecification : Spek({
     debugRx(false)
@@ -102,6 +107,21 @@ object MetricsSpecification : Spek({
         }
     }
 
+    describe("Processing time") {
+        it("should gather processing time metric") {
+            val delay = Duration.ofMillis(10)
+            val sut = vesHvWithDelayingSink(delay)
+
+            sut.handleConnection(vesWireFrameMessage(PERF3GPP))
+
+
+            val metrics = sut.metrics
+            assertThat(metrics.lastProcessingTimeMicros)
+                    .describedAs("processingTime metric")
+                    .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
+        }
+    }
+
     describe("Messages dropped metrics") {
         it("should gather metrics for invalid messages") {
             val sut = vesHvWithNoOpSink(basicConfiguration)
index 0c1b589..7ebbfba 100644 (file)
@@ -77,3 +77,8 @@ fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConf
         Sut(NoOpSink()).apply {
             configurationProvider.updateConfiguration(collectorConfiguration)
         }
+
+fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+        Sut(ProcessingSink { it.delayElements(delay) }).apply {
+            configurationProvider.updateConfiguration(collectorConfiguration)
+        }
index 9ddb711..660ce49 100644 (file)
 package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import java.time.Duration
+import java.time.Instant
 import java.util.concurrent.ConcurrentHashMap
 import kotlin.test.fail
 
@@ -31,6 +35,7 @@ import kotlin.test.fail
 class FakeMetrics : Metrics {
     var bytesReceived: Int = 0
     var messageBytesReceived: Int = 0
+    var lastProcessingTimeMicros: Double = -1.0
     var messagesSentCount: Int = 0
     var messagesDroppedCount: Int = 0
 
@@ -41,13 +46,16 @@ class FakeMetrics : Metrics {
         bytesReceived += size
     }
 
-    override fun notifyMessageReceived(size: Int) {
-        messageBytesReceived += size
+    override fun notifyMessageReceived(msg: WireFrameMessage) {
+        messageBytesReceived += msg.payloadSize
     }
 
-    override fun notifyMessageSent(topic: String) {
+    override fun notifyMessageSent(msg: RoutedMessage) {
         messagesSentCount++
-        messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 }
+        messagesSentToTopic.compute(msg.topic) { k, _ ->
+            messagesSentToTopic[k]?.inc() ?: 1
+        }
+        lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
     }
 
     override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -61,4 +69,4 @@ class FakeMetrics : Metrics {
     fun messagesDropped(cause: MessageDropCause) =
             messagesDroppedCause[cause]
                     ?: fail("No messages were dropped due to cause: ${cause.name}")
-}
\ No newline at end of file
+}
index 865dd51..2f731f5 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.tests.fakes
 
+import arrow.core.identity
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
 import java.util.*
 import java.util.concurrent.ConcurrentLinkedDeque
 import java.util.concurrent.atomic.AtomicLong
+import java.util.function.Function
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -58,6 +61,9 @@ class CountingSink : Sink {
     }
 }
 
-class NoOpSink : Sink {
-    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages
+
+open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
+    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
 }
+
+class NoOpSink : ProcessingSink(::identity)
index 1257c6b..d1fdb10 100644 (file)
@@ -22,6 +22,8 @@ package org.onap.dcae.collectors.veshv.domain
 import arrow.core.Either
 import arrow.core.Either.Companion.left
 import arrow.core.Either.Companion.right
+import java.time.Instant
+import java.time.temporal.Temporal
 
 
 /**
@@ -58,7 +60,8 @@ data class WireFrameMessage(val payload: ByteData,
                             val versionMajor: Short,
                             val versionMinor: Short,
                             val payloadType: Int,
-                            val payloadSize: Int
+                            val payloadSize: Int,
+                            val receivedAt: Temporal = Instant.now()
 ) {
     constructor(payload: ByteArray) : this(
             ByteData(payload),
index 18678ff..259fa03 100644 (file)
@@ -29,7 +29,11 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
 import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
 import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import java.time.Duration
+import java.time.Instant
 
 
 /**
@@ -53,6 +57,7 @@ class MicrometerMetrics internal constructor(
     private val droppedCount = { cause: String ->
         registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
     }.memoize<String, Counter>()
+    private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
 
     init {
         registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
@@ -71,14 +76,15 @@ class MicrometerMetrics internal constructor(
         receivedBytes.increment(size.toDouble())
     }
 
-    override fun notifyMessageReceived(size: Int) {
+    override fun notifyMessageReceived(msg: WireFrameMessage) {
         receivedMsgCount.increment()
-        receivedMsgBytes.increment(size.toDouble())
+        receivedMsgBytes.increment(msg.payloadSize.toDouble())
     }
 
-    override fun notifyMessageSent(topic: String) {
+    override fun notifyMessageSent(msg: RoutedMessage) {
         sentCountTotal.increment()
-        sentCount(topic).increment()
+        sentCount(msg.topic).increment()
+        processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
     }
 
     override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -100,7 +106,7 @@ class MicrometerMetrics internal constructor(
         internal const val DROPPED = "dropped"
         internal const val CAUSE = "cause"
         internal const val TOTAL = "total"
-
+        internal const val TIME = "time"
         fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
     }
 }
index e2dc2f8..cb5cfc7 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
 import arrow.core.Try
 import io.micrometer.core.instrument.Counter
 import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Timer
 import io.micrometer.core.instrument.search.RequiredSearch
 import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
@@ -35,6 +36,15 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
+import java.time.Instant
+import java.time.temporal.Temporal
+import java.util.concurrent.TimeUnit
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -63,6 +73,9 @@ object MicrometerMetricsTest : Spek({
     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
             verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
 
+    fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
+            verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
+
     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
             verifyMeter(search, RequiredSearch::counter, verifier)
 
@@ -71,6 +84,7 @@ object MicrometerMetricsTest : Spek({
 
     fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
         registry.meters
+                .filter { it.id.name.startsWith(PREFIX) }
                 .filter { it is Counter }
                 .map { it as Counter }
                 .filterNot { it.id.name in changedCounters }
@@ -105,7 +119,7 @@ object MicrometerMetricsTest : Spek({
             val counterName = "$PREFIX.messages.received.count"
 
             it("should increment counter") {
-                cut.notifyMessageReceived(777)
+                cut.notifyMessageReceived(emptyWireProtocolFrame())
 
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -118,7 +132,7 @@ object MicrometerMetricsTest : Spek({
 
             it("should increment counter") {
                 val bytes = 888
-                cut.notifyMessageReceived(bytes)
+                cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
 
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
@@ -127,7 +141,7 @@ object MicrometerMetricsTest : Spek({
         }
 
         it("should leave all other counters unchanged") {
-            cut.notifyMessageReceived(128)
+            cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
             verifyAllCountersAreUnchangedBut(
                     "$PREFIX.messages.received.count",
                     "$PREFIX.messages.received.bytes"
@@ -143,7 +157,7 @@ object MicrometerMetricsTest : Spek({
             val counterName = "$PREFIX.messages.sent.count.total"
 
             it("should increment counter") {
-                cut.notifyMessageSent(topicName1)
+                cut.notifyMessageSent(routedMessage(topicName1))
 
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -155,9 +169,9 @@ object MicrometerMetricsTest : Spek({
         on("$PREFIX.messages.sent.topic.count counter") {
             val counterName = "$PREFIX.messages.sent.count.topic"
             it("should handle counters for different topics") {
-                cut.notifyMessageSent(topicName1)
-                cut.notifyMessageSent(topicName2)
-                cut.notifyMessageSent(topicName2)
+                cut.notifyMessageSent(routedMessage(topicName1))
+                cut.notifyMessageSent(routedMessage(topicName2))
+                cut.notifyMessageSent(routedMessage(topicName2))
 
                 verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -168,6 +182,24 @@ object MicrometerMetricsTest : Spek({
                 }
             }
         }
+
+        on("$PREFIX.messages.processing.time") {
+            val counterName = "$PREFIX.messages.processing.time"
+            val processingTimeMs = 100L
+
+            it("should update timer") {
+
+                cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
+
+                verifyTimer(counterName) { timer ->
+                    assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
+                }
+                verifyAllCountersAreUnchangedBut(
+                        counterName,
+                        "$PREFIX.messages.sent.count.topic",
+                        "$PREFIX.messages.sent.count.total")
+            }
+        }
     }
 
     describe("notifyMessageDropped") {
@@ -207,27 +239,27 @@ object MicrometerMetricsTest : Spek({
         it("should show difference between sent and received messages") {
 
             on("positive difference") {
-                cut.notifyMessageReceived(128)
-                cut.notifyMessageReceived(256)
-                cut.notifyMessageReceived(256)
-                cut.notifyMessageSent("perf3gpp")
+                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+                cut.notifyMessageSent(routedMessage("perf3gpp"))
                 verifyGauge("messages.processing.count") {
                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
                 }
             }
 
             on("zero difference") {
-                cut.notifyMessageReceived(128)
-                cut.notifyMessageSent("perf3gpp")
+                cut.notifyMessageReceived(emptyWireProtocolFrame())
+                cut.notifyMessageSent(routedMessage("perf3gpp"))
                 verifyGauge("messages.processing.count") {
                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
                 }
             }
 
             on("negative difference") {
-                cut.notifyMessageReceived(128)
-                cut.notifyMessageSent("fault")
-                cut.notifyMessageSent("perf3gpp")
+                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+                cut.notifyMessageSent(routedMessage("fault"))
+                cut.notifyMessageSent(routedMessage("perf3gpp"))
                 verifyGauge("messages.processing.count") {
                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
                 }
@@ -236,3 +268,15 @@ object MicrometerMetricsTest : Spek({
     }
 
 })
+
+fun routedMessage(topic: String, partition: Int = 0) =
+        vesEvent().let {evt ->
+            RoutedMessage(topic, partition,
+                    VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
+        }
+
+fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
+        vesEvent().let {evt ->
+            RoutedMessage(topic, partition,
+                    VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
+        }
\ No newline at end of file
index cf30d2c..a845689 100644 (file)
@@ -23,8 +23,10 @@ package org.onap.dcae.collectors.veshv.tests.utils
 import com.google.protobuf.ByteString
 import com.google.protobuf.MessageLite
 import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadContentType
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.ves.VesEventOuterClass
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
@@ -72,7 +74,38 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP,
                 .setVesEventListenerVersion(vesEventListenerVersion)
                 .build()
 
-fun vesEventBytes(commonHeader: CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
-        vesEvent(commonHeader, byteString).toByteData()
+fun emptyWireProtocolFrame(): WireFrameMessage = wireProtocolFrameWithPayloadSize(0)
+
+
+fun wireProtocolFrameWithPayloadSize(size: Int): WireFrameMessage = WireFrameMessage(
+        payload = ByteData(ByteArray(size)),
+        versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
+        versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR,
+        payloadSize = size,
+        payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
+)
+
+fun wireProtocolFrame(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): WireFrameMessage =
+        vesEventBytes(commonHeader, eventFields).let { payload ->
+            WireFrameMessage(
+                    payload = payload,
+                    versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
+                    versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR,
+                    payloadSize = payload.size(),
+                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
+            )
+        }
+
+fun wireProtocolFrame(evt: VesEventOuterClass.VesEvent) =
+        WireFrameMessage(
+                payload = ByteData(evt.toByteArray()),
+                payloadSize = evt.serializedSize,
+                payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
+                versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR
+        )
+
+fun vesEventBytes(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): ByteData =
+        vesEvent(commonHeader, eventFields).toByteData()
 
 fun MessageLite.toByteData(): ByteData = ByteData(toByteArray())
\ No newline at end of file