Add metric for rejected clients count 20/74720/5
authorFilip Krzywka <filip.krzywka@nokia.com>
Mon, 17 Dec 2018 13:25:56 +0000 (14:25 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Tue, 18 Dec 2018 08:15:59 +0000 (09:15 +0100)
- renamed few counters to be more verbose about what they count
- removed not needed 'total' suffix in metrics name

Change-Id: I6be0201e5f39f1298706c536b12410413d49df19
Issue-ID: DCAEGEN2-1043
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt [moved from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt with 55% similarity]
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt

index 1334738..61d28c2 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary
 
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -35,6 +36,7 @@ interface Metrics {
     fun notifyMessageReceived(msg: WireFrameMessage)
     fun notifyMessageSent(msg: RoutedMessage)
     fun notifyMessageDropped(cause: MessageDropCause)
+    fun notifyClientRejected(cause: ClientRejectionCause)
 }
 
 @FunctionalInterface
index 51f894d..5c3f339 100644 (file)
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -60,7 +61,9 @@ internal class VesHvCollector(
                     .transform(::decodeProtobufPayload)
                     .transform(::filterInvalidProtobufMessages)
                     .transform(::routeMessage)
-                    .onErrorResume { logger.handleReactiveStreamError(clientContext, it) }
+                    .onErrorResume {
+                        metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
+                        logger.handleReactiveStreamError(clientContext, it) }
                     .doFinally { releaseBuffersMemory() }
                     .then()
 
index 83a7cd8..8184540 100644 (file)
@@ -25,5 +25,5 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-class WireFrameException(error: WireFrameDecodingError)
+class WireFrameException(val error: WireFrameDecodingError)
     : Exception("${error::class.simpleName}: ${error.message}")
  */
 package org.onap.dcae.collectors.veshv.model
 
+import org.onap.dcae.collectors.veshv.domain.InvalidWireFrameMarker
+import org.onap.dcae.collectors.veshv.domain.PayloadSizeExceeded
+import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException
+
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since December 2018
@@ -27,3 +31,23 @@ enum class MessageDropCause(val tag: String) {
     ROUTE_NOT_FOUND("routing"),
     INVALID_MESSAGE("invalid")
 }
+
+enum class ClientRejectionCause(val tag: String) {
+    INVALID_WIRE_FRAME_MARKER("invalid_marker"),
+    PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE("too_big_payload"),
+    UNEXPECTED_STREAM_ERROR("unexpected");
+
+    companion object {
+        fun fromThrowable(err: Throwable): ClientRejectionCause =
+                when (err) {
+                    is WireFrameException -> fromWireFrameException(err)
+                    else -> UNEXPECTED_STREAM_ERROR
+                }
+
+        private fun fromWireFrameException(err: WireFrameException) = when (err.error) {
+            is InvalidWireFrameMarker -> INVALID_WIRE_FRAME_MARKER
+            is PayloadSizeExceeded -> PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
+            else -> UNEXPECTED_STREAM_ERROR
+        }
+    }
+}
index 9f5c37e..572cc79 100644 (file)
@@ -23,18 +23,23 @@ import com.google.protobuf.ByteString
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
@@ -153,7 +158,7 @@ object MetricsSpecification : Spek({
                     .isEqualTo(1)
         }
 
-        it("should gather summed metrics for dropped messages"){
+        it("should gather summed metrics for dropped messages") {
             val sut = vesHvWithNoOpSink(basicConfiguration)
 
             sut.handleConnection(
@@ -168,4 +173,31 @@ object MetricsSpecification : Spek({
                     .isEqualTo(2)
         }
     }
+
+    describe("clients rejected metrics") {
+
+        given("rejection causes") {
+            mapOf(
+                    ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
+                            messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1),
+                    ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
+            ).forEach { cause, vesMessage ->
+                on("cause $cause") {
+                    it("should notify correct metrics") {
+                        val sut = vesHvWithNoOpSink()
+
+                        sut.handleConnection(vesMessage)
+
+                        val metrics = sut.metrics
+                        assertThat(metrics.clientRejectionCause.size)
+                                .describedAs("metrics were notified with only one rejection cause")
+                                .isOne()
+                        assertThat(metrics.clientRejectionCause.get(cause))
+                                .describedAs("metrics were notified only once with correct client rejection cause")
+                                .isOne()
+                    }
+                }
+            }
+        }
+    }
 })
index 660ce49..a27d167 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import java.time.Duration
 import java.time.Instant
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import java.util.concurrent.ConcurrentHashMap
 import kotlin.test.fail
 
@@ -33,14 +34,15 @@ import kotlin.test.fail
  * @since June 2018
  */
 class FakeMetrics : Metrics {
-    var bytesReceived: Int = 0
-    var messageBytesReceived: Int = 0
-    var lastProcessingTimeMicros: Double = -1.0
-    var messagesSentCount: Int = 0
-    var messagesDroppedCount: Int = 0
 
-    private val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+    var bytesReceived: Int = 0 ; private set
+    var messageBytesReceived: Int = 0 ; private set
+    var messagesDroppedCount: Int = 0 ; private set
+    var lastProcessingTimeMicros: Double = -1.0 ; private set
     private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap()
+    var messagesSentCount: Int = 0 ; private set
+    val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+    var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>() ; private set
 
     override fun notifyBytesReceived(size: Int) {
         bytesReceived += size
@@ -63,6 +65,10 @@ class FakeMetrics : Metrics {
         messagesDroppedCause.compute(cause) { k, _ -> messagesDroppedCause[k]?.inc() ?: 1 }
     }
 
+    override fun notifyClientRejected(cause: ClientRejectionCause) {
+        clientRejectionCause.compute(cause) { k, _ -> clientRejectionCause[k]?.inc() ?: 1 }
+    }
+
     fun messagesOnTopic(topic: String) =
             messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic")
 
index 259fa03..f060426 100644 (file)
@@ -31,6 +31,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import java.time.Duration
 import java.time.Instant
@@ -47,18 +48,24 @@ class MicrometerMetrics internal constructor(
     private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
     private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
     private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
-    private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL))
-    private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL))
 
-    private val sentCount = { topic: String ->
-        registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic)
+    private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+    private val sentToTopicCount = { topic: String ->
+        registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
     }.memoize<String, Counter>()
 
-    private val droppedCount = { cause: String ->
-        registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
+    private val droppedCount = registry.counter(name(MESSAGES, DROPPED, COUNT))
+    private val droppedCauseCount = { cause: String ->
+        registry.counter(name(MESSAGES, DROPPED, CAUSE, COUNT), CAUSE, cause)
     }.memoize<String, Counter>()
+
     private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
 
+    private val clientsRejectedCount = registry.counter(name(CLIENTS, REJECTED, COUNT))
+    private val clientsRejectedCauseCount = { cause: String ->
+        registry.counter(name(CLIENTS, REJECTED, CAUSE, COUNT), CAUSE, cause)
+    }.memoize<String, Counter>()
+
     init {
         registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
             (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
@@ -70,6 +77,7 @@ class MicrometerMetrics internal constructor(
         JvmThreadMetrics().bindTo(registry)
     }
 
+
     val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
 
     override fun notifyBytesReceived(size: Int) {
@@ -83,13 +91,18 @@ class MicrometerMetrics internal constructor(
 
     override fun notifyMessageSent(msg: RoutedMessage) {
         sentCountTotal.increment()
-        sentCount(msg.topic).increment()
+        sentToTopicCount(msg.topic).increment()
         processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
     }
 
     override fun notifyMessageDropped(cause: MessageDropCause) {
-        droppedCountTotal.increment()
-        droppedCount(cause.tag).increment()
+        droppedCount.increment()
+        droppedCauseCount(cause.tag).increment()
+    }
+
+    override fun notifyClientRejected(cause: ClientRejectionCause) {
+        clientsRejectedCount.increment()
+        clientsRejectedCauseCount(cause.tag).increment()
     }
 
     companion object {
@@ -102,10 +115,11 @@ class MicrometerMetrics internal constructor(
         internal const val DATA = "data"
         internal const val SENT = "sent"
         internal const val PROCESSING = "processing"
+        internal const val CAUSE = "cause"
+        internal const val CLIENTS = "clients"
+        internal const val REJECTED = "rejected"
         internal const val TOPIC = "topic"
         internal const val DROPPED = "dropped"
-        internal const val CAUSE = "cause"
-        internal const val TOTAL = "total"
         internal const val TIME = "time"
         fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
     }
index cb5cfc7..2ecdb26 100644 (file)
@@ -36,6 +36,8 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -60,7 +62,7 @@ object MicrometerMetricsTest : Spek({
         cut = MicrometerMetrics(registry)
     }
 
-    fun registrySearch() = RequiredSearch.`in`(registry)
+    fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
 
     fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
             Try {
@@ -71,16 +73,16 @@ object MicrometerMetricsTest : Spek({
             )
 
     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
-            verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+            verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
 
     fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
-            verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
+            verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
 
     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
             verifyMeter(search, RequiredSearch::counter, verifier)
 
     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
-            verifyCounter(registrySearch().name(name), verifier)
+            verifyCounter(registrySearch(name), verifier)
 
     fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
         registry.meters
@@ -153,8 +155,8 @@ object MicrometerMetricsTest : Spek({
         val topicName1 = "PERF3GPP"
         val topicName2 = "CALLTRACE"
 
-        on("$PREFIX.messages.sent.count.total counter") {
-            val counterName = "$PREFIX.messages.sent.count.total"
+        on("$PREFIX.messages.sent.count counter") {
+            val counterName = "$PREFIX.messages.sent.count"
 
             it("should increment counter") {
                 cut.notifyMessageSent(routedMessage(topicName1))
@@ -162,22 +164,22 @@ object MicrometerMetricsTest : Spek({
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
                 }
-                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic")
+                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
             }
         }
 
         on("$PREFIX.messages.sent.topic.count counter") {
-            val counterName = "$PREFIX.messages.sent.count.topic"
+            val counterName = "$PREFIX.messages.sent.topic.count"
             it("should handle counters for different topics") {
                 cut.notifyMessageSent(routedMessage(topicName1))
                 cut.notifyMessageSent(routedMessage(topicName2))
                 cut.notifyMessageSent(routedMessage(topicName2))
 
-                verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
+                verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
                 }
 
-                verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) {
+                verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
                 }
             }
@@ -196,16 +198,16 @@ object MicrometerMetricsTest : Spek({
                 }
                 verifyAllCountersAreUnchangedBut(
                         counterName,
-                        "$PREFIX.messages.sent.count.topic",
-                        "$PREFIX.messages.sent.count.total")
+                        "$PREFIX.messages.sent.topic.count",
+                        "$PREFIX.messages.sent.count")
             }
         }
     }
 
     describe("notifyMessageDropped") {
 
-        on("$PREFIX.messages.dropped.count.total counter") {
-            val counterName = "$PREFIX.messages.dropped.count.total"
+        on("$PREFIX.messages.dropped.count counter") {
+            val counterName = "$PREFIX.messages.dropped.count"
             it("should increment counter") {
                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
                 cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -213,22 +215,22 @@ object MicrometerMetricsTest : Spek({
                 verifyCounter(counterName) {
                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
                 }
-                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause")
+                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause.count")
             }
         }
 
-        on("$PREFIX.messages.dropped.count.cause counter") {
-            val counterName = "$PREFIX.messages.dropped.count.cause"
+        on("$PREFIX.messages.dropped.cause.count counter") {
+            val counterName = "$PREFIX.messages.dropped.cause.count"
             it("should handle counters for different drop reasons") {
                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
                 cut.notifyMessageDropped(INVALID_MESSAGE)
                 cut.notifyMessageDropped(INVALID_MESSAGE)
 
-                verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
+                verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
                 }
 
-                verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) {
+                verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
                 }
             }
@@ -267,6 +269,38 @@ object MicrometerMetricsTest : Spek({
         }
     }
 
+    describe("notifyClientRejected") {
+
+        on("$PREFIX.clients.rejected.count") {
+            val counterName = "$PREFIX.clients.rejected.count"
+            it("should increment counter for each possible reason") {
+                cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
+                cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+                }
+                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause.count")
+            }
+        }
+
+        on("$PREFIX.clients.rejected.cause.count counter") {
+            val counterName = "$PREFIX.clients.rejected.cause.count"
+            it("should handle counters for different rejection reasons") {
+                cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
+                cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+                cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
+
+                verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
+                    assertThat(it.count()).isCloseTo(1.0, doublePrecision)
+                }
+
+                verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
+                    assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+                }
+            }
+        }
+    }
 })
 
 fun routedMessage(topic: String, partition: Int = 0) =
@@ -279,4 +313,4 @@ fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
         vesEvent().let {evt ->
             RoutedMessage(topic, partition,
                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
-        }
\ No newline at end of file
+        }