From 0ba97c7eac5a821c813bfa8ac31b1063956d3824 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 13 Jun 2018 15:45:00 +0200 Subject: [PATCH] Add monitoring support by means of micrometer.io Closes ONAP-345 Change-Id: I58c145b1d37a6b32fbe5b157723c152eb571a2dd Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- hv-collector-core/pom.xml | 1 - .../dcae/collectors/veshv/boundary/adapters.kt | 8 +- .../collectors/veshv/factory/CollectorFactory.kt | 16 +- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 7 +- .../veshv/impl/adapters/LoggingSinkProvider.kt | 3 +- .../veshv/impl/adapters/kafka/KafkaSink.kt | 6 +- .../dcae/collectors/veshv/tests/component/Sut.kt | 4 +- .../dcae/collectors/veshv/tests/fakes/metrics.kt | 38 ++++ .../onap/dcae/collectors/veshv/tests/fakes/sink.kt | 4 +- hv-collector-main/pom.xml | 12 ++ .../collectors/veshv/main/MicrometerMetrics.kt | 67 ++++++++ .../org/onap/dcae/collectors/veshv/main/main.kt | 3 +- .../collectors/veshv/main/MicrometerMetricsTest.kt | 191 +++++++++++++++++++++ pom.xml | 15 ++ 14 files changed, 357 insertions(+), 18 deletions(-) create mode 100644 hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt create mode 100644 hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index ed9f1ad5..a372fb22 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -73,7 +73,6 @@ ${project.parent.version} compile - org.jetbrains.kotlin kotlin-reflect diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 53fd7c3a..e4f02000 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -25,7 +25,13 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import reactor.core.publisher.Flux interface Sink { - fun send(messages: Flux): Flux + fun send(messages: Flux): Flux +} + +interface Metrics { + fun notifyBytesReceived(size: Int) + fun notifyMessageReceived(size: Int) + fun notifyMessageSent(topic: String) } @FunctionalInterface diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 73f4d09d..8785180b 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.MessageValidator @@ -38,7 +39,9 @@ import java.util.concurrent.atomic.AtomicReference * @since May 2018 */ class CollectorFactory(val configuration: ConfigurationProvider, - private val sinkProvider: SinkProvider) { + private val sinkProvider: SinkProvider, + private val metrics: Metrics) { + fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference = AtomicReference() createVesHvCollector().subscribe(collector::set) @@ -50,11 +53,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, - VesDecoder(), - MessageValidator(), - Router(config.routing), - sinkProvider(config)) + wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, + protobufDecoder = VesDecoder(), + validator = MessageValidator(), + router = Router(config.routing), + sink = sinkProvider(config), + metrics = metrics) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 965943f6..222eaefa 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -40,18 +41,22 @@ internal class VesHvCollector( private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, - private val sink: Sink) : Collector { + private val sink: Sink, + private val metrics: Metrics) : Collector { override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux): Mono = wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireDecoder::decode) + .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } .filter(WireFrame::isValid) .map(WireFrame::payload) .map(protobufDecoder::decode) .filter(validator::isValid) .flatMap(this::findRoute) .compose(sink::send) + .doOnNext { metrics.notifyMessageSent(it.topic) } .doOnTerminate { releaseBuffersMemory(wireDecoder) } .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index b943e4e5..a5c41046 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -40,10 +40,9 @@ internal class LoggingSinkProvider : SinkProvider { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() - override fun send(messages: Flux): Flux = + override fun send(messages: Flux): Flux = messages .doOnNext(this::logMessage) - .map { it.message } private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 6142fa3c..0a548a52 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -35,7 +35,7 @@ import reactor.kafka.sender.SenderResult */ internal class KafkaSink(private val sender: KafkaSender) : Sink { - override fun send(messages: Flux): Flux { + override fun send(messages: Flux): Flux { val records = messages.map(this::vesToKafkaRecord) return sender.send(records) .doOnNext(::logException) @@ -43,14 +43,14 @@ internal class KafkaSink(private val sender: KafkaSender { + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord { return SenderRecord.create( msg.topic, msg.partition, System.currentTimeMillis(), msg.message.header, msg.message, - msg.message) + msg) } private fun logException(senderResult: SenderResult) { diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index c4e9874f..5099ae4c 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.Exceptions @@ -40,7 +41,8 @@ internal class Sut { val configurationProvider = FakeConfigurationProvider() val sink = FakeSink() val alloc = UnpooledByteBufAllocator.DEFAULT - private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink)) + val metrics = FakeMetrics() + private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics) val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt new file mode 100644 index 00000000..cfc44bcd --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.Metrics + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class FakeMetrics: Metrics { + override fun notifyBytesReceived(size: Int) { + } + + override fun notifyMessageReceived(size: Int) { + } + + override fun notifyMessageSent(topic: String) { + } + +} \ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 5d592e42..b0dbd0f5 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -36,7 +36,7 @@ class FakeSink : Sink { val sentMessages: List get() = sent.toList() - override fun send(messages: Flux): Flux { - return messages.doOnNext(sent::addLast).map { it.message } + override fun send(messages: Flux): Flux { + return messages.doOnNext(sent::addLast) } } diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index a5a35ba3..dbec1def 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -90,6 +90,14 @@ ${project.parent.version} + + io.arrow-kt + arrow-core + + + io.arrow-kt + arrow-syntax + org.slf4j slf4j-api @@ -108,6 +116,10 @@ runtime ${os.detected.classifier} + + io.micrometer + micrometer-registry-jmx + org.assertj assertj-core diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt new file mode 100644 index 00000000..8a8b6d39 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.syntax.function.memoize +import io.micrometer.core.instrument.Clock +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.jmx.JmxConfig +import io.micrometer.jmx.JmxMeterRegistry +import org.onap.dcae.collectors.veshv.boundary.Metrics + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class MicrometerMetrics( + private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM) +) : Metrics { + + private val receivedBytes = registry.counter("data.received.bytes") + private val receivedMsgCount = registry.counter("messages.received.count") + private val receivedMsgBytes = registry.counter("messages.received.bytes") + private val sentCountTotal = registry.counter("messages.sent.count") + + init { + registry.gauge("messages.processing.count", this) { + (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) + } + } + + private val sentCount = { topic: String -> + registry.counter("messages.sent.count", "topic", topic) + }.memoize() + + + override fun notifyBytesReceived(size: Int) { + receivedBytes.increment(size.toDouble()) + } + + override fun notifyMessageReceived(size: Int) { + receivedMsgCount.increment() + receivedMsgBytes.increment(size.toDouble()) + } + + override fun notifyMessageSent(topic: String) { + sentCountTotal.increment() + sentCount(topic).increment() + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index b7d97028..1f2686ba 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -39,7 +39,8 @@ fun main(args: Array) { val collectorProvider = CollectorFactory( resolveConfigurationProvider(serverConfiguration), - AdapterFactory.kafkaSink() + AdapterFactory.kafkaSink(), + MicrometerMetrics() ).createVesHvCollectorProvider() ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() } catch (ex: WrongArgumentException) { diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt new file mode 100644 index 00000000..675647c4 --- /dev/null +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -0,0 +1,191 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.core.Try +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.search.RequiredSearch +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +object MicrometerMetricsTest : Spek({ + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: SimpleMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = SimpleMeterRegistry() + cut = MicrometerMetrics(registry) + } + + fun registrySearch() = RequiredSearch.`in`(registry) + + fun verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) = + Try { + map(search) + }.fold( + { ex -> assertThat(ex).doesNotThrowAnyException() }, + verifier + ) + + fun verifyGauge(name: String, verifier: (Gauge) -> T) = + verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier) + + fun verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = + verifyMeter(search, RequiredSearch::counter, verifier) + + fun verifyCounter(name: String, verifier: (Counter) -> T) = + verifyCounter(registrySearch().name(name), verifier) + + fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { + registry.meters + .filter { it is Counter } + .filterNot { it.id.name in changedCounters } + .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) } + } + + describe("notifyBytesReceived") { + + on("data.received.bytes counter") { + val counterName = "data.received.bytes" + + it("should increment counter") { + val bytes = 128 + cut.notifyBytesReceived(bytes) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + } + } + + it("should leave all other counters unchanged") { + cut.notifyBytesReceived(128) + verifyAllCountersAreUnchangedBut(counterName) + } + } + } + + describe("notifyMessageReceived") { + on("messages.received.count counter") { + val counterName = "messages.received.count" + + it("should increment counter") { + cut.notifyMessageReceived(777) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + on("messages.received.bytes counter") { + val counterName = "messages.received.bytes" + + it("should increment counter") { + val bytes = 888 + cut.notifyMessageReceived(bytes) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + } + } + } + + it("should leave all other counters unchanged") { + cut.notifyMessageReceived(128) + verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes") + } + } + + describe("notifyMessageSent") { + val topicName = "dmaap_topic_name" + val counterName = "messages.sent.count" + + on("$counterName counter") { + + it("should increment counter") { + cut.notifyMessageSent(topicName) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + on("$counterName[topic=$topicName] counter") { + + it("should increment counter") { + cut.notifyMessageSent(topicName) + + verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + it("should leave all other counters unchanged") { + cut.notifyMessageSent(topicName) + verifyAllCountersAreUnchangedBut(counterName) + } + } + + describe("processing gauge") { + it("should show difference between sent and received messages") { + + on("positive difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageReceived(256) + cut.notifyMessageReceived(256) + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(2.0, doublePrecision) + } + } + + on("zero difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + } + } + + on("negative difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageSent("calltrace") + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + } + } + } + } + +}) diff --git a/pom.xml b/pom.xml index 9e33ec56..f478df3c 100644 --- a/pom.xml +++ b/pom.xml @@ -528,6 +528,16 @@ kotlin-reflect ${kotlin.version} + + io.arrow-kt + arrow-core + 0.7.2 + + + io.arrow-kt + arrow-syntax + 0.7.2 + ch.qos.logback logback-classic @@ -582,6 +592,11 @@ ratpack-core 1.5.4 + + io.micrometer + micrometer-registry-jmx + 1.0.5 + -- 2.16.6