2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.main
 
  23 import io.micrometer.core.instrument.Counter
 
  24 import io.micrometer.core.instrument.Gauge
 
  25 import io.micrometer.core.instrument.Meter
 
  26 import io.micrometer.core.instrument.Timer
 
  27 import io.micrometer.core.instrument.search.RequiredSearch
 
  28 import io.micrometer.prometheus.PrometheusConfig
 
  29 import io.micrometer.prometheus.PrometheusMeterRegistry
 
  30 import org.assertj.core.api.Assertions.assertThat
 
  31 import org.assertj.core.data.Percentage
 
  32 import org.jetbrains.spek.api.Spek
 
  33 import org.jetbrains.spek.api.dsl.describe
 
  34 import org.jetbrains.spek.api.dsl.it
 
  35 import org.jetbrains.spek.api.dsl.on
 
  36 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 
  37 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
 
  38 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
 
  39 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
 
  40 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 
  41 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 
  42 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 
  43 import org.onap.dcae.collectors.veshv.model.VesMessage
 
  44 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
 
  45 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
 
  46 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
 
  47 import java.time.Instant
 
  48 import java.time.temporal.Temporal
 
  49 import java.util.concurrent.TimeUnit
 
  50 import kotlin.reflect.KClass
 
  53  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  56 object MicrometerMetricsTest : Spek({
 
  57     val doublePrecision = Percentage.withPercentage(0.5)
 
  58     lateinit var registry: PrometheusMeterRegistry
 
  59     lateinit var cut: MicrometerMetrics
 
  62         registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
 
  63         cut = MicrometerMetrics(registry)
 
  66     fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName)
 
  68     fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
 
  72                     { ex -> assertThat(ex).doesNotThrowAnyException() },
 
  76     fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
 
  77             verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier)
 
  79     fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
 
  80             verifyMeter(registrySearch(name), RequiredSearch::timer, verifier)
 
  82     fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
 
  83             verifyMeter(search, RequiredSearch::counter, verifier)
 
  85     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
 
  86             verifyCounter(registrySearch(name), verifier)
 
  89     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
 
  90         fun <T : Meter> verifyAllMetersAreUnchangedBut(
 
  92                 changedCounters: Collection<String>,
 
  93                 valueOf: (T) -> Double) {
 
  95                     .filter { it.id.name.startsWith(PREFIX) }
 
  96                     .filter { clazz.isInstance(it) }
 
  98                     .filterNot { it.id.name in changedCounters }
 
 100                         assertThat(valueOf(it))
 
 101                                 .describedAs(it.id.toString())
 
 102                                 .isCloseTo(0.0, doublePrecision)
 
 106         setOf(*changedMeters).let { changedMetersCollection ->
 
 107             verifyAllMetersAreUnchangedBut(Counter::class, changedMetersCollection) { it.count() }
 
 108             verifyAllMetersAreUnchangedBut(Timer::class, changedMetersCollection) { it.count().toDouble() }
 
 113     describe("notifyBytesReceived") {
 
 114         on("$PREFIX.data.received.bytes counter") {
 
 115             val counterName = "$PREFIX.data.received.bytes"
 
 117             it("should increment counter") {
 
 119                 cut.notifyBytesReceived(bytes)
 
 121                 verifyCounter(counterName) {
 
 122                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
 
 126             it("should leave all other counters unchanged") {
 
 127                 cut.notifyBytesReceived(128)
 
 128                 verifyCountersAndTimersAreUnchangedBut(counterName)
 
 133     describe("notifyMessageReceived") {
 
 134         on("$PREFIX.messages.received counter") {
 
 135             val counterName = "$PREFIX.messages.received"
 
 137             it("should increment counter") {
 
 138                 cut.notifyMessageReceived(emptyWireProtocolFrame())
 
 140                 verifyCounter(counterName) {
 
 141                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
 
 146         on("$PREFIX.messages.received.payload.bytes counter") {
 
 147             val counterName = "$PREFIX.messages.received.payload.bytes"
 
 149             it("should increment counter") {
 
 151                 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
 
 153                 verifyCounter(counterName) {
 
 154                     assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
 
 159         it("should leave all other counters unchanged") {
 
 160             cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
 
 161             verifyCountersAndTimersAreUnchangedBut(
 
 162                     "$PREFIX.messages.received",
 
 163                     "$PREFIX.messages.received.payload.bytes"
 
 168     describe("notifyMessageSent") {
 
 169         val topicName1 = "PERF3GPP"
 
 170         val topicName2 = "CALLTRACE"
 
 172         on("$PREFIX.messages.sent counter") {
 
 173             val counterName = "$PREFIX.messages.sent"
 
 175             it("should increment counter") {
 
 176                 cut.notifyMessageSent(routedMessage(topicName1))
 
 178                 verifyCounter(counterName) {
 
 179                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
 
 181                 verifyCountersAndTimersAreUnchangedBut(
 
 183                         "$PREFIX.messages.sent.topic",
 
 184                         "$PREFIX.messages.processing.time",
 
 185                         "$PREFIX.messages.latency")
 
 189         on("$PREFIX.messages.sent.topic counter") {
 
 190             val counterName = "$PREFIX.messages.sent.topic"
 
 192             it("should handle counters for different topics") {
 
 193                 cut.notifyMessageSent(routedMessage(topicName1))
 
 194                 cut.notifyMessageSent(routedMessage(topicName2))
 
 195                 cut.notifyMessageSent(routedMessage(topicName2))
 
 197                 verifyCounter(registrySearch(counterName).tag("topic", topicName1)) {
 
 198                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
 
 201                 verifyCounter(registrySearch(counterName).tag("topic", topicName2)) {
 
 202                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
 
 207         on("$PREFIX.messages.processing.time") {
 
 208             val counterName = "$PREFIX.messages.processing.time"
 
 209             val processingTimeMs = 100L
 
 211             it("should update timer") {
 
 213                 cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs)))
 
 215                 verifyTimer(counterName) { timer ->
 
 216                     assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
 
 218                 verifyCountersAndTimersAreUnchangedBut(
 
 220                         "$PREFIX.messages.sent.topic",
 
 221                         "$PREFIX.messages.sent",
 
 222                         "$PREFIX.messages.latency")
 
 226         on("$PREFIX.messages.latency") {
 
 227             val counterName = "$PREFIX.messages.latency"
 
 228             val latencyMs = 1666L
 
 230             it("should update timer") {
 
 232                 cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs)))
 
 234                 verifyTimer(counterName) { timer ->
 
 235                     assertThat(timer.mean(TimeUnit.MILLISECONDS))
 
 236                             .isGreaterThanOrEqualTo(latencyMs.toDouble())
 
 237                             .isLessThanOrEqualTo(latencyMs + 10000.0)
 
 240                 verifyCountersAndTimersAreUnchangedBut(
 
 242                         "$PREFIX.messages.sent.topic",
 
 243                         "$PREFIX.messages.sent",
 
 244                         "$PREFIX.messages.processing.time")
 
 249     describe("notifyMessageDropped") {
 
 250         on("$PREFIX.messages.dropped counter") {
 
 251             val counterName = "$PREFIX.messages.dropped"
 
 253             it("should increment counter") {
 
 254                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
 
 255                 cut.notifyMessageDropped(INVALID_MESSAGE)
 
 257                 verifyCounter(counterName) {
 
 258                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
 
 260                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause")
 
 264         on("$PREFIX.messages.dropped.cause counter") {
 
 265             val counterName = "$PREFIX.messages.dropped.cause"
 
 267             it("should handle counters for different drop reasons") {
 
 268                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
 
 269                 cut.notifyMessageDropped(INVALID_MESSAGE)
 
 270                 cut.notifyMessageDropped(INVALID_MESSAGE)
 
 272                 verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) {
 
 273                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
 
 276                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) {
 
 277                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
 
 283     describe("notifyClientConnected") {
 
 284         on("$PREFIX.connections counter") {
 
 285             val counterName = "$PREFIX.connections"
 
 287             it("should increment counter") {
 
 288                 cut.notifyClientConnected()
 
 289                 cut.notifyClientConnected()
 
 291                 verifyCounter(counterName) {
 
 292                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
 
 294                 verifyCountersAndTimersAreUnchangedBut(counterName)
 
 300     describe("notifyClientDisconnected") {
 
 301         on("$PREFIX.disconnections counter") {
 
 302             val counterName = "$PREFIX.disconnections"
 
 304             it("should increment counter") {
 
 305                 cut.notifyClientDisconnected()
 
 306                 cut.notifyClientDisconnected()
 
 308                 verifyCounter(counterName) {
 
 309                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
 
 311                 verifyCountersAndTimersAreUnchangedBut(counterName)
 
 317     describe("notifyClientRejected") {
 
 319         on("$PREFIX.clients.rejected") {
 
 320             val counterName = "$PREFIX.clients.rejected"
 
 321             it("should increment counter for each possible reason") {
 
 322                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
 
 323                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
 
 325                 verifyCounter(counterName) {
 
 326                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
 
 328                 verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause")
 
 332         on("$PREFIX.clients.rejected.cause counter") {
 
 333             val counterName = "$PREFIX.clients.rejected.cause"
 
 334             it("should handle counters for different rejection reasons") {
 
 335                 cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER)
 
 336                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
 
 337                 cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE)
 
 339                 verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) {
 
 340                     assertThat(it.count()).isCloseTo(1.0, doublePrecision)
 
 343                 verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) {
 
 344                     assertThat(it.count()).isCloseTo(2.0, doublePrecision)
 
 350     describe("$PREFIX.connections.active gauge") {
 
 351         val gaugeName = "$PREFIX.connections.active"
 
 353         on("connection traffic") {
 
 354             it("should calculate positive difference between connected and disconnected clients") {
 
 355                 cut.notifyClientConnected()
 
 356                 cut.notifyClientConnected()
 
 357                 cut.notifyClientConnected()
 
 358                 cut.notifyClientDisconnected()
 
 360                 verifyGauge(gaugeName) {
 
 361                     assertThat(it.value()).isCloseTo(2.0, doublePrecision)
 
 365             it("should calculate no difference between connected and disconnected clients") {
 
 366                 cut.notifyClientDisconnected()
 
 367                 cut.notifyClientDisconnected()
 
 369                 verifyGauge(gaugeName) {
 
 370                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
 
 374             it("should calculate negative difference between connected and disconnected clients") {
 
 375                 cut.notifyClientDisconnected()
 
 377                 verifyGauge(gaugeName) {
 
 378                     assertThat(it.value()).isCloseTo(0.0, doublePrecision)
 
 385 fun routedMessage(topic: String, partition: Int = 0) =
 
 386         vesEvent().let { evt ->
 
 387             RoutedMessage(topic, partition,
 
 388                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
 
 391 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
 
 392         vesEvent().let { evt ->
 
 393             RoutedMessage(topic, partition,
 
 394                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
 
 397 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
 
 398         vesEvent().let { evt ->
 
 399             val builder = evt.toBuilder()
 
 400             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
 
 403             RoutedMessage(topic, partition,
 
 404                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))