Add metrics for dropped messages 67/74667/9
authorJakub Dudycz <jakub.dudycz@nokia.com>
Fri, 14 Dec 2018 14:20:56 +0000 (15:20 +0100)
committerJakub Dudycz <jakub.dudycz@nokia.com>
Mon, 17 Dec 2018 12:18:13 +0000 (13:18 +0100)
Add counters for messages dropped due to validation or undefined routing
Slight refactoring

Change-Id: Ibe4e38445e81babc745d7a7d95356910845293ce
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1037

12 files changed:
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt [new file with mode: 0644]
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt

index b686b25..3f69c08 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary
 
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import reactor.core.publisher.Flux
 
@@ -32,6 +33,7 @@ interface Metrics {
     fun notifyBytesReceived(size: Int)
     fun notifyMessageReceived(size: Int)
     fun notifyMessageSent(topic: String)
+    fun notifyMessageDropped(cause: MessageDropCause)
 }
 
 @FunctionalInterface
index ca1605e..b29432f 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import arrow.core.Either
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
@@ -29,9 +28,15 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
 import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import reactor.core.publisher.Flux
@@ -66,7 +71,11 @@ internal class VesHvCollector(
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
     private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
-            .filterFailedWithLog(MessageValidator::validateFrameMessage)
+            .filterFailedWithLog {
+                MessageValidator
+                        .validateFrameMessage(it)
+                        .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+            }
 
     private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
             .map(WireFrameMessage::payload)
@@ -74,12 +83,17 @@ internal class VesHvCollector(
 
     private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
             .decode(rawPayload)
+            .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
             .filterFailedWithLog(logger, clientContext::fullMdc,
                     { "Ves event header decoded successfully" },
                     { "Failed to decode ves event header, reason: ${it.message}" })
 
     private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
-            .filterFailedWithLog(MessageValidator::validateProtobufMessage)
+            .filterFailedWithLog {
+                MessageValidator
+                        .validateProtobufMessage(it)
+                        .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+            }
 
     private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
             .flatMap(this::findRoute)
@@ -88,6 +102,7 @@ internal class VesHvCollector(
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
+            .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
             .filterEmptyWithLog(logger, clientContext::fullMdc,
                     { "Found route for message: ${it.topic}, partition: ${it.partition}" },
                     { "Could not find route for message" })
@@ -95,7 +110,7 @@ internal class VesHvCollector(
     private fun releaseBuffersMemory() = wireChunkDecoder.release()
             .also { logger.debug { "Released buffer memory after handling message stream" } }
 
-    fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+    private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
             filterFailedWithLog(logger, clientContext::fullMdc, predicate)
 
     companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/MessageDropCause.kt
new file mode 100644 (file)
index 0000000..af43ae6
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since December 2018
+ */
+enum class MessageDropCause(val tag: String) {
+    ROUTE_NOT_FOUND("routing"),
+    INVALID_MESSAGE("invalid")
+}
index 2feca41..dd8acf7 100644 (file)
@@ -25,18 +25,15 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.tests.fakes.NoOpSink
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
-import kotlin.test.fail
+import org.onap.dcae.collectors.veshv.tests.utils.*
 
 object MetricsSpecification : Spek({
     debugRx(false)
@@ -45,7 +42,7 @@ object MetricsSpecification : Spek({
         it("should sum up all bytes received") {
             val sut = vesHvWithNoOpSink()
             val vesWireFrameMessage = vesWireFrameMessage()
-            val invalidWireFrame = invalidWireFrame()
+            val invalidWireFrame = messageWithInvalidWireFrameHeader()
 
             val bytesSent = invalidWireFrame.readableBytes() +
                     vesWireFrameMessage.readableBytes()
@@ -93,18 +90,62 @@ object MetricsSpecification : Spek({
             )
 
             val metrics = sut.metrics
-            assertThat(metrics.messageSentCount)
-                    .describedAs("messageSentCount metric")
+            assertThat(metrics.messagesSentCount)
+                    .describedAs("messagesSentCount metric")
                     .isEqualTo(3)
-            assertThat(messagesOnTopic(metrics, PERF3GPP_TOPIC))
+            assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
                     .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
                     .isEqualTo(2)
-            assertThat(messagesOnTopic(metrics, MEASUREMENTS_FOR_VF_SCALING_TOPIC))
+            assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
                     .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
                     .isEqualTo(1)
         }
     }
-})
 
-private fun messagesOnTopic(metrics: FakeMetrics, topic: String) =
-        metrics.messagesSentToTopic.get(topic) ?: fail("No messages were sent to topic $topic")
\ No newline at end of file
+    describe("Messages dropped metrics") {
+        it("should gather metrics for invalid messages") {
+            val sut = vesHvWithNoOpSink(basicConfiguration)
+
+            sut.handleConnection(
+                    messageWithInvalidWireFrameHeader(),
+                    wireFrameMessageWithInvalidPayload(),
+                    vesWireFrameMessage(domain = PERF3GPP),
+                    messageWithInvalidListenerVersion()
+            )
+
+            val metrics = sut.metrics
+            assertThat(metrics.messagesDropped(INVALID_MESSAGE))
+                    .describedAs("messagesDroppedCause $INVALID_MESSAGE metric")
+                    .isEqualTo(3)
+        }
+
+        it("should gather metrics for route not found") {
+            val sut = vesHvWithNoOpSink(basicConfiguration)
+
+            sut.handleConnection(
+                    vesWireFrameMessage(domain = PERF3GPP),
+                    vesWireFrameMessage(domain = HEARTBEAT)
+            )
+
+            val metrics = sut.metrics
+            assertThat(metrics.messagesDropped(ROUTE_NOT_FOUND))
+                    .describedAs("messagesDroppedCause $ROUTE_NOT_FOUND metric")
+                    .isEqualTo(1)
+        }
+
+        it("should gather summed metrics for dropped messages"){
+            val sut = vesHvWithNoOpSink(basicConfiguration)
+
+            sut.handleConnection(
+                    vesWireFrameMessage(domain = PERF3GPP),
+                    vesWireFrameMessage(domain = HEARTBEAT),
+                    wireFrameMessageWithInvalidPayload()
+            )
+
+            val metrics = sut.metrics
+            assertThat(metrics.messagesDroppedCount)
+                    .describedAs("messagesDroppedCount metric")
+                    .isEqualTo(2)
+        }
+    }
+})
index ab59cc2..338c373 100644 (file)
@@ -37,11 +37,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
-import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
-import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import org.onap.dcae.collectors.veshv.tests.utils.*
 
 import reactor.core.publisher.Flux
 import java.time.Duration
@@ -71,8 +67,8 @@ object VesHvSpecification : Spek({
         it("should release memory for each handled and dropped message") {
             val (sut, sink) = vesHvWithStoringSink()
             val validMessage = vesWireFrameMessage(PERF3GPP)
-            val msgWithInvalidFrame = invalidWireFrame()
-            val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
+            val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
+            val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
             val expectedRefCnt = 0
 
             val handledEvents = sut.handleConnection(
@@ -329,7 +325,7 @@ object VesHvSpecification : Spek({
 
             val handledMessages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP, "first"),
-                    vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
+                    messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
                     vesWireFrameMessage(PERF3GPP))
 
             assertThat(handledMessages).hasSize(1)
index dd09805..9ddb711 100644 (file)
@@ -20,8 +20,9 @@
 package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.fail
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -29,11 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger
  */
 class FakeMetrics : Metrics {
     var bytesReceived: Int = 0
-
     var messageBytesReceived: Int = 0
+    var messagesSentCount: Int = 0
+    var messagesDroppedCount: Int = 0
 
-    var messageSentCount: Int = 0
-    val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+    private val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+    private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap()
 
     override fun notifyBytesReceived(size: Int) {
         bytesReceived += size
@@ -44,7 +46,19 @@ class FakeMetrics : Metrics {
     }
 
     override fun notifyMessageSent(topic: String) {
-        messageSentCount++
-        messagesSentToTopic.compute(topic, { k, v -> messagesSentToTopic.get(k)?.inc() ?: 1 })
+        messagesSentCount++
+        messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 }
+    }
+
+    override fun notifyMessageDropped(cause: MessageDropCause) {
+        messagesDroppedCount++
+        messagesDroppedCause.compute(cause) { k, _ -> messagesDroppedCause[k]?.inc() ?: 1 }
     }
+
+    fun messagesOnTopic(topic: String) =
+            messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic")
+
+    fun messagesDropped(cause: MessageDropCause) =
+            messagesDroppedCause[cause]
+                    ?: fail("No messages were dropped due to cause: ${cause.name}")
 }
\ No newline at end of file
index cf90359..18678ff 100644 (file)
@@ -29,6 +29,7 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
 import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
 import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
 
 
 /**
@@ -42,7 +43,16 @@ class MicrometerMetrics internal constructor(
     private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
     private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
     private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
-    private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+    private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT, TOTAL))
+    private val droppedCountTotal = registry.counter(name(MESSAGES, DROPPED, COUNT, TOTAL))
+
+    private val sentCount = { topic: String ->
+        registry.counter(name(MESSAGES, SENT, COUNT, TOPIC), TOPIC, topic)
+    }.memoize<String, Counter>()
+
+    private val droppedCount = { cause: String ->
+        registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
+    }.memoize<String, Counter>()
 
     init {
         registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
@@ -55,10 +65,6 @@ class MicrometerMetrics internal constructor(
         JvmThreadMetrics().bindTo(registry)
     }
 
-    private val sentCount = { topic: String ->
-        registry.counter("hvves.messages.sent.topic.count", "topic", topic)
-    }.memoize<String, Counter>()
-
     val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
 
     override fun notifyBytesReceived(size: Int) {
@@ -75,6 +81,11 @@ class MicrometerMetrics internal constructor(
         sentCount(topic).increment()
     }
 
+    override fun notifyMessageDropped(cause: MessageDropCause) {
+        droppedCountTotal.increment()
+        droppedCount(cause.tag).increment()
+    }
+
     companion object {
         val INSTANCE = MicrometerMetrics()
         internal const val PREFIX = "hvves"
@@ -85,6 +96,11 @@ class MicrometerMetrics internal constructor(
         internal const val DATA = "data"
         internal const val SENT = "sent"
         internal const val PROCESSING = "processing"
+        internal const val TOPIC = "topic"
+        internal const val DROPPED = "dropped"
+        internal const val CAUSE = "cause"
+        internal const val TOTAL = "total"
+
         fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
     }
 }
index 66326dd..e2dc2f8 100644 (file)
@@ -23,7 +23,6 @@ import arrow.core.Try
 import io.micrometer.core.instrument.Counter
 import io.micrometer.core.instrument.Gauge
 import io.micrometer.core.instrument.search.RequiredSearch
-import io.micrometer.core.instrument.simple.SimpleMeterRegistry
 import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
 import org.assertj.core.api.Assertions.assertThat
@@ -32,9 +31,10 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -88,8 +88,8 @@ object MicrometerMetricsTest : Spek({
                 val bytes = 128
                 cut.notifyBytesReceived(bytes)
 
-                verifyCounter(counterName) { counter ->
-                    assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
                 }
             }
 
@@ -107,8 +107,8 @@ object MicrometerMetricsTest : Spek({
             it("should increment counter") {
                 cut.notifyMessageReceived(777)
 
-                verifyCounter(counterName) { counter ->
-                    assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(1.0, doublePrecision)
                 }
             }
         }
@@ -120,15 +120,18 @@ object MicrometerMetricsTest : Spek({
                 val bytes = 888
                 cut.notifyMessageReceived(bytes)
 
-                verifyCounter(counterName) { counter ->
-                    assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
                 }
             }
         }
 
         it("should leave all other counters unchanged") {
             cut.notifyMessageReceived(128)
-            verifyAllCountersAreUnchangedBut("$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes")
+            verifyAllCountersAreUnchangedBut(
+                    "$PREFIX.messages.received.count",
+                    "$PREFIX.messages.received.bytes"
+            )
         }
     }
 
@@ -136,32 +139,65 @@ object MicrometerMetricsTest : Spek({
         val topicName1 = "PERF3GPP"
         val topicName2 = "CALLTRACE"
 
-        on("$PREFIX.messages.sent.count counter") {
-            val counterName = "$PREFIX.messages.sent.count"
+        on("$PREFIX.messages.sent.count.total counter") {
+            val counterName = "$PREFIX.messages.sent.count.total"
 
             it("should increment counter") {
                 cut.notifyMessageSent(topicName1)
 
-                verifyCounter(counterName) { counter ->
-                    assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(1.0, doublePrecision)
                 }
-                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
+                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.count.topic")
             }
         }
 
         on("$PREFIX.messages.sent.topic.count counter") {
-            val counterName = "$PREFIX.messages.sent.topic.count"
+            val counterName = "$PREFIX.messages.sent.count.topic"
             it("should handle counters for different topics") {
                 cut.notifyMessageSent(topicName1)
                 cut.notifyMessageSent(topicName2)
                 cut.notifyMessageSent(topicName2)
 
-                verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { counter ->
-                    assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+                verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
+                    assertThat(it.count()).isCloseTo(1.0, doublePrecision)
+                }
+
+                verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) {
+                    assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+                }
+            }
+        }
+    }
+
+    describe("notifyMessageDropped") {
+
+        on("$PREFIX.messages.dropped.count.total counter") {
+            val counterName = "$PREFIX.messages.dropped.count.total"
+            it("should increment counter") {
+                cut.notifyMessageDropped(ROUTE_NOT_FOUND)
+                cut.notifyMessageDropped(INVALID_MESSAGE)
+
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+                }
+                verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.count.cause")
+            }
+        }
+
+        on("$PREFIX.messages.dropped.count.cause counter") {
+            val counterName = "$PREFIX.messages.dropped.count.cause"
+            it("should handle counters for different drop reasons") {
+                cut.notifyMessageDropped(ROUTE_NOT_FOUND)
+                cut.notifyMessageDropped(INVALID_MESSAGE)
+                cut.notifyMessageDropped(INVALID_MESSAGE)
+
+                verifyCounter(registrySearch().name(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
+                    assertThat(it.count()).isCloseTo(1.0, doublePrecision)
                 }
 
-                verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { counter ->
-                    assertThat(counter.count()).isCloseTo(2.0, doublePrecision)
+                verifyCounter(registrySearch().name(counterName).tag("cause", INVALID_MESSAGE.tag)) {
+                    assertThat(it.count()).isCloseTo(2.0, doublePrecision)
                 }
             }
         }
@@ -175,16 +211,16 @@ object MicrometerMetricsTest : Spek({
                 cut.notifyMessageReceived(256)
                 cut.notifyMessageReceived(256)
                 cut.notifyMessageSent("perf3gpp")
-                verifyGauge("messages.processing.count") { gauge ->
-                    assertThat(gauge.value()).isCloseTo(2.0, doublePrecision)
+                verifyGauge("messages.processing.count") {
+                    assertThat(it.value()).isCloseTo(2.0, doublePrecision)
                 }
             }
 
             on("zero difference") {
                 cut.notifyMessageReceived(128)
                 cut.notifyMessageSent("perf3gpp")
-                verifyGauge("messages.processing.count") { gauge ->
-                    assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+                verifyGauge("messages.processing.count") {
+                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
                 }
             }
 
@@ -192,8 +228,8 @@ object MicrometerMetricsTest : Spek({
                 cut.notifyMessageReceived(128)
                 cut.notifyMessageSent("fault")
                 cut.notifyMessageSent("perf3gpp")
-                verifyGauge("messages.processing.count") { gauge ->
-                    assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+                verifyGauge("messages.processing.count") {
+                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
                 }
             }
         }
index 01e1166..90c4aa1 100644 (file)
@@ -23,61 +23,67 @@ import com.google.protobuf.ByteString
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
 import org.onap.ves.VesEventOuterClass.VesEvent
-
 import java.util.UUID.randomUUID
 
 
 val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
 
-private fun ByteBuf.writeValidWireFrameHeaders() {
+private fun validWireFrame() = allocator.buffer().run {
     writeByte(0xAA)          // always 0xAA
     writeByte(0x01)          // major version
     writeByte(0x00)          // minor version
     writeZero(RESERVED_BYTE_COUNT)  // reserved
-    writeShort(0x0001)       // content type = GPB
+    writeShort(0x0001)  // content type = GPB
+}
+
+private fun invalidWireFrame() = allocator.buffer().run {
+    writeByte(0xAA)          // always 0xAA
+    writeByte(0x00)          // invalid major version
+    writeByte(0x00)          // minor version
+    writeZero(RESERVED_BYTE_COUNT)  // reserved
+    writeShort(0x0001)  // content type = GPB
+}
+
+fun garbageFrame(): ByteBuf = allocator.buffer().run {
+    writeBytes("the meaning of life is &@)(*_!".toByteArray())
 }
 
 fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
                         id: String = randomUUID().toString(),
-                        eventFields: ByteString = ByteString.EMPTY): ByteBuf =
-        vesWireFrameMessage(vesEvent(domain, id, eventFields))
-
-fun vesWireFrameMessage(vesEvent: VesEvent) =
-        allocator.buffer().run {
-            writeValidWireFrameHeaders()
+                        eventFields: ByteString = ByteString.EMPTY,
+                        vesEventListenerVersion: String = "7.0.2"): ByteBuf =
+        vesWireFrameMessage(vesEvent(domain, id, eventFields, vesEventListenerVersion))
 
+fun vesWireFrameMessage(vesEvent: VesEvent): ByteBuf =
+        validWireFrame().run {
             val gpb = vesEvent.toByteString().asReadOnlyByteBuffer()
             writeInt(gpb.limit())  // ves event size in bytes
-            writeBytes(gpb)  // ves event as GPB bytes
+            writeBytes(gpb)   // ves event as GPB bytes
         }
 
-fun wireFrameMessageWithInvalidPayload(): ByteBuf = allocator.buffer().run {
-    writeValidWireFrameHeaders()
-
-    val invalidGpb = "some random data".toByteArray(Charsets.UTF_8)
-    writeInt(invalidGpb.size)  // ves event size in bytes
-    writeBytes(invalidGpb)
-}
-
-fun garbageFrame(): ByteBuf = allocator.buffer().run {
-    writeBytes("the meaning of life is &@)(*_!".toByteArray())
-}
+fun messageWithInvalidWireFrameHeader(vesEvent: VesEvent = vesEvent()): ByteBuf =
+        invalidWireFrame().run {
+            val gpb = vesEvent.toByteString().asReadOnlyByteBuffer()
+            writeInt(gpb.limit())           // ves event size in bytes
+            writeBytes(gpb)            // ves event as GPB bytes
+        }
 
-fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
-    writeByte(0xAA)
-    writeByte(0x01)   // version major
-    writeByte(0x01)   // version minor
-}
+fun wireFrameMessageWithInvalidPayload(): ByteBuf =
+        validWireFrame().run {
+            val invalidGpb = "some random data".toByteArray(Charsets.UTF_8)
+            writeInt(invalidGpb.size)  // ves event size in bytes
+            writeBytes(invalidGpb)
+        }
 
-fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
+fun messageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
         vesWireFrameMessage(
                 domain = domain,
                 eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
         )
 
-
+fun messageWithInvalidListenerVersion() = vesWireFrameMessage(vesEventListenerVersion = "invalid")
\ No newline at end of file
index 569f1a9..cf30d2c 100644 (file)
@@ -32,8 +32,12 @@ import java.util.UUID.randomUUID
 
 fun vesEvent(domain: VesEventDomain = PERF3GPP,
              id: String = randomUUID().toString(),
-             eventFields: ByteString = ByteString.EMPTY
-): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), eventFields)
+             eventFields: ByteString = ByteString.EMPTY,
+             vesEventListenerVersion: String = "7.0.2"
+): VesEventOuterClass.VesEvent = vesEvent(
+        commonHeader(domain, id, vesEventListenerVersion),
+        eventFields
+)
 
 fun vesEvent(commonEventHeader: CommonEventHeader,
              eventFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
@@ -44,8 +48,9 @@ fun vesEvent(commonEventHeader: CommonEventHeader,
 
 fun commonHeader(domain: VesEventDomain = PERF3GPP,
                  id: String = randomUUID().toString(),
-                 priority: Priority = Priority.NORMAL,
-                 vesEventListenerVersion: String = "7.0.2"): CommonEventHeader =
+                 vesEventListenerVersion: String = "7.0.2",
+                 priority: Priority = Priority.NORMAL
+): CommonEventHeader =
         CommonEventHeader.newBuilder()
                 .setVersion("sample-version")
                 .setDomain(domain.domainName)
index 7381592..cb1c622 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.utils.arrow
 
 import arrow.core.Either
 import arrow.core.Option
+import arrow.core.Try
 import arrow.core.identity
 import arrow.syntax.collections.firstOption
 import java.util.concurrent.atomic.AtomicReference
@@ -45,3 +46,10 @@ fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: (
                 .map { it() }
                 .filter { it != null }
                 .firstOption()
+
+
+fun <A, B> Either<A, B>.doOnLeft(action: () -> Unit): Either<A, B> = apply { if (isLeft()) action() }
+
+fun <A> Option<A>.doOnEmpty(action: () -> Unit): Option<A> = apply { if (isEmpty()) action() }
+
+fun <A> Try<A>.doOnFailure(action: () -> Unit): Try<A> = apply { if (isFailure()) action() }
index 95590d9..e7aca55 100644 (file)
@@ -25,6 +25,8 @@ import arrow.core.Try
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
+typealias MessageEither = Either<() -> String, () -> String>
+
 fun <T> Logger.handleReactiveStreamError(
         context: MappedDiagnosticContext,
         ex: Throwable,
@@ -60,7 +62,7 @@ fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
 
 fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
                                     context: MappedDiagnosticContext,
-                                    predicate: (T) -> Either<() -> String, () -> String>) =
+                                    predicate: (T) -> MessageEither): Flux<T> =
         flatMap { t ->
             predicate(t).fold({
                 logger.warn(context, it)
@@ -69,4 +71,4 @@ fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
                 logger.trace(context, it)
                 Mono.just<T>(t)
             })
-        }
+        }
\ No newline at end of file