Add monitoring support by means of micrometer.io 95/58595/2
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 13 Jun 2018 13:45:00 +0000 (15:45 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 07:11:16 +0000 (09:11 +0200)
Closes ONAP-345

Change-Id: I58c145b1d37a6b32fbe5b157723c152eb571a2dd
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

14 files changed:
hv-collector-core/pom.xml
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt [new file with mode: 0644]
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
hv-collector-main/pom.xml
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt [new file with mode: 0644]
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt [new file with mode: 0644]
pom.xml

index ed9f1ad..a372fb2 100644 (file)
@@ -73,7 +73,6 @@
       <version>${project.parent.version}</version>
       <scope>compile</scope>
     </dependency>
-
     <dependency>
       <groupId>org.jetbrains.kotlin</groupId>
       <artifactId>kotlin-reflect</artifactId>
index 53fd7c3..e4f0200 100644 (file)
@@ -25,7 +25,13 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
 import reactor.core.publisher.Flux
 
 interface Sink {
-    fun send(messages: Flux<RoutedMessage>): Flux<VesMessage>
+    fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+}
+
+interface Metrics {
+    fun notifyBytesReceived(size: Int)
+    fun notifyMessageReceived(size: Int)
+    fun notifyMessageSent(topic: String)
 }
 
 @FunctionalInterface
index 73f4d09..8785180 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.factory
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.impl.MessageValidator
@@ -38,7 +39,9 @@ import java.util.concurrent.atomic.AtomicReference
  * @since May 2018
  */
 class CollectorFactory(val configuration: ConfigurationProvider,
-                       private val sinkProvider: SinkProvider) {
+                       private val sinkProvider: SinkProvider,
+                       private val metrics: Metrics) {
+
     fun createVesHvCollectorProvider(): CollectorProvider {
         val collector: AtomicReference<Collector> = AtomicReference()
         createVesHvCollector().subscribe(collector::set)
@@ -50,11 +53,12 @@ class CollectorFactory(val configuration: ConfigurationProvider,
 
     private fun createVesHvCollector(config: CollectorConfiguration): Collector {
         return VesHvCollector(
-                { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
-                VesDecoder(),
-                MessageValidator(),
-                Router(config.routing),
-                sinkProvider(config))
+                wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
+                protobufDecoder = VesDecoder(),
+                validator = MessageValidator(),
+                router = Router(config.routing),
+                sink = sinkProvider(config),
+                metrics = metrics)
     }
 
 }
index 965943f..222eaef 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -40,18 +41,22 @@ internal class VesHvCollector(
         private val protobufDecoder: VesDecoder,
         private val validator: MessageValidator,
         private val router: Router,
-        private val sink: Sink) : Collector {
+        private val sink: Sink,
+        private val metrics: Metrics) : Collector {
 
     override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
             wireChunkDecoderSupplier(alloc).let { wireDecoder ->
                 dataStream
+                        .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
                         .concatMap(wireDecoder::decode)
+                        .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
                         .filter(WireFrame::isValid)
                         .map(WireFrame::payload)
                         .map(protobufDecoder::decode)
                         .filter(validator::isValid)
                         .flatMap(this::findRoute)
                         .compose(sink::send)
+                        .doOnNext { metrics.notifyMessageSent(it.topic) }
                         .doOnTerminate { releaseBuffersMemory(wireDecoder) }
                         .then()
             }
index b943e4e..a5c4104 100644 (file)
@@ -40,10 +40,9 @@ internal class LoggingSinkProvider : SinkProvider {
             private val totalMessages = AtomicLong()
             private val totalBytes = AtomicLong()
 
-            override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+            override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
                     messages
                             .doOnNext(this::logMessage)
-                            .map { it.message }
 
             private fun logMessage(msg: RoutedMessage) {
                 val msgs = totalMessages.addAndGet(1)
index 6142fa3..0a548a5 100644 (file)
@@ -35,7 +35,7 @@ import reactor.kafka.sender.SenderResult
  */
 internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
 
-    override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
+    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
         val records = messages.map(this::vesToKafkaRecord)
         return sender.send(records)
                 .doOnNext(::logException)
@@ -43,14 +43,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
                 .map { it.correlationMetadata() }
     }
 
-    private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> {
+    private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
         return SenderRecord.create(
                 msg.topic,
                 msg.partition,
                 System.currentTimeMillis(),
                 msg.message.header,
                 msg.message,
-                msg.message)
+                msg)
     }
 
     private fun logException(senderResult: SenderResult<out Any>) {
index c4e9874..5099ae4 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.Exceptions
@@ -40,7 +41,8 @@ internal class Sut {
     val configurationProvider = FakeConfigurationProvider()
     val sink = FakeSink()
     val alloc = UnpooledByteBufAllocator.DEFAULT
-    private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink))
+    val metrics = FakeMetrics()
+    private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
     val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
new file mode 100644 (file)
index 0000000..cfc44bc
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class FakeMetrics: Metrics {
+    override fun notifyBytesReceived(size: Int) {
+    }
+
+    override fun notifyMessageReceived(size: Int) {
+    }
+
+    override fun notifyMessageSent(topic: String) {
+    }
+
+}
\ No newline at end of file
index 5d592e4..b0dbd0f 100644 (file)
@@ -36,7 +36,7 @@ class FakeSink : Sink {
     val sentMessages: List<RoutedMessage>
         get() = sent.toList()
 
-    override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
-        return messages.doOnNext(sent::addLast).map { it.message }
+    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+        return messages.doOnNext(sent::addLast)
     }
 }
index a5a35ba..dbec1de 100644 (file)
             <version>${project.parent.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-syntax</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <scope>runtime</scope>
             <classifier>${os.detected.classifier}</classifier>
         </dependency>
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-jmx</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt
new file mode 100644 (file)
index 0000000..8a8b6d3
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.syntax.function.memoize
+import io.micrometer.core.instrument.Clock
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.jmx.JmxConfig
+import io.micrometer.jmx.JmxMeterRegistry
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class MicrometerMetrics(
+        private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM)
+) : Metrics {
+
+    private val receivedBytes = registry.counter("data.received.bytes")
+    private val receivedMsgCount = registry.counter("messages.received.count")
+    private val receivedMsgBytes = registry.counter("messages.received.bytes")
+    private val sentCountTotal = registry.counter("messages.sent.count")
+
+    init {
+        registry.gauge("messages.processing.count", this) {
+            (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+        }
+    }
+
+    private val sentCount = { topic: String ->
+        registry.counter("messages.sent.count", "topic", topic)
+    }.memoize<String, Counter>()
+
+
+    override fun notifyBytesReceived(size: Int) {
+        receivedBytes.increment(size.toDouble())
+    }
+
+    override fun notifyMessageReceived(size: Int) {
+        receivedMsgCount.increment()
+        receivedMsgBytes.increment(size.toDouble())
+    }
+
+    override fun notifyMessageSent(topic: String) {
+        sentCountTotal.increment()
+        sentCount(topic).increment()
+    }
+}
index b7d9702..1f2686b 100644 (file)
@@ -39,7 +39,8 @@ fun main(args: Array<String>) {
 
         val collectorProvider = CollectorFactory(
                 resolveConfigurationProvider(serverConfiguration),
-                AdapterFactory.kafkaSink()
+                AdapterFactory.kafkaSink(),
+                MicrometerMetrics()
         ).createVesHvCollectorProvider()
         ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
     } catch (ex: WrongArgumentException) {
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
new file mode 100644 (file)
index 0000000..675647c
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Try
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.search.RequiredSearch
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.data.Percentage
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+object MicrometerMetricsTest : Spek({
+    val doublePrecision = Percentage.withPercentage(0.5)
+    lateinit var registry: SimpleMeterRegistry
+    lateinit var cut: MicrometerMetrics
+
+    beforeEachTest {
+        registry = SimpleMeterRegistry()
+        cut = MicrometerMetrics(registry)
+    }
+
+    fun registrySearch() = RequiredSearch.`in`(registry)
+
+    fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
+            Try {
+                map(search)
+            }.fold(
+                    { ex -> assertThat(ex).doesNotThrowAnyException() },
+                    verifier
+            )
+
+    fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
+            verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+
+    fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
+            verifyMeter(search, RequiredSearch::counter, verifier)
+
+    fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
+            verifyCounter(registrySearch().name(name), verifier)
+
+    fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
+        registry.meters
+                .filter { it is Counter }
+                .filterNot { it.id.name in changedCounters }
+                .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) }
+    }
+
+    describe("notifyBytesReceived") {
+
+        on("data.received.bytes counter") {
+            val counterName = "data.received.bytes"
+
+            it("should increment counter") {
+                val bytes = 128
+                cut.notifyBytesReceived(bytes)
+
+                verifyCounter(counterName) { counter ->
+                    assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+                }
+            }
+
+            it("should leave all other counters unchanged") {
+                cut.notifyBytesReceived(128)
+                verifyAllCountersAreUnchangedBut(counterName)
+            }
+        }
+    }
+
+    describe("notifyMessageReceived") {
+        on("messages.received.count counter") {
+            val counterName = "messages.received.count"
+
+            it("should increment counter") {
+                cut.notifyMessageReceived(777)
+
+                verifyCounter(counterName) { counter ->
+                    assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+                }
+            }
+        }
+
+        on("messages.received.bytes counter") {
+            val counterName = "messages.received.bytes"
+
+            it("should increment counter") {
+                val bytes = 888
+                cut.notifyMessageReceived(bytes)
+
+                verifyCounter(counterName) { counter ->
+                    assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+                }
+            }
+        }
+
+        it("should leave all other counters unchanged") {
+            cut.notifyMessageReceived(128)
+            verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes")
+        }
+    }
+
+    describe("notifyMessageSent") {
+        val topicName = "dmaap_topic_name"
+        val counterName = "messages.sent.count"
+
+        on("$counterName counter") {
+
+            it("should increment counter") {
+                cut.notifyMessageSent(topicName)
+
+                verifyCounter(counterName) { counter ->
+                    assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+                }
+            }
+        }
+
+        on("$counterName[topic=$topicName] counter") {
+
+            it("should increment counter") {
+                cut.notifyMessageSent(topicName)
+
+                verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter ->
+                    assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+                }
+            }
+        }
+
+        it("should leave all other counters unchanged") {
+            cut.notifyMessageSent(topicName)
+            verifyAllCountersAreUnchangedBut(counterName)
+        }
+    }
+
+    describe("processing gauge") {
+        it("should show difference between sent and received messages") {
+
+            on("positive difference") {
+                cut.notifyMessageReceived(128)
+                cut.notifyMessageReceived(256)
+                cut.notifyMessageReceived(256)
+                cut.notifyMessageSent("hvranmeas")
+                verifyGauge("messages.processing.count") { gauge ->
+                    assertThat(gauge.value()).isCloseTo(2.0, doublePrecision)
+                }
+            }
+
+            on("zero difference") {
+                cut.notifyMessageReceived(128)
+                cut.notifyMessageSent("hvranmeas")
+                verifyGauge("messages.processing.count") { gauge ->
+                    assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+                }
+            }
+
+            on("negative difference") {
+                cut.notifyMessageReceived(128)
+                cut.notifyMessageSent("calltrace")
+                cut.notifyMessageSent("hvranmeas")
+                verifyGauge("messages.processing.count") { gauge ->
+                    assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+                }
+            }
+        }
+    }
+
+})
diff --git a/pom.xml b/pom.xml
index 9e33ec5..f478df3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                 <artifactId>kotlin-reflect</artifactId>
                 <version>${kotlin.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.arrow-kt</groupId>
+                <artifactId>arrow-core</artifactId>
+                <version>0.7.2</version>
+            </dependency>
+            <dependency>
+                <groupId>io.arrow-kt</groupId>
+                <artifactId>arrow-syntax</artifactId>
+                <version>0.7.2</version>
+            </dependency>
             <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-classic</artifactId>
                 <artifactId>ratpack-core</artifactId>
                 <version>1.5.4</version>
             </dependency>
+            <dependency>
+                <groupId>io.micrometer</groupId>
+                <artifactId>micrometer-registry-jmx</artifactId>
+                <version>1.0.5</version>
+            </dependency>
 
             <!-- Test dependencies -->