* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2020 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
}
}
+ on("$PREFIX.messages.to.collector.travel.time") {
+ val counterName = "$PREFIX.messages.to.collector.travel.time"
+ val toCollectorTravelTimeMs = 100L
+
+ it("should update timer") {
+ val now = Instant.now()
+ val vesMessage = vesMessageReceivedAt(now, sentAt = now.minusMillis(toCollectorTravelTimeMs))
+ cut.notifyMessageReceived(vesMessage)
+
+ registry.verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isEqualTo(toCollectorTravelTimeMs.toDouble())
+ }
+
+ verifyCountersAndTimersAreUnchangedBut(counterName)
+ }
+ }
+
on("$PREFIX.messages.processing.time.without.routing") {
val counterName = "$PREFIX.messages.processing.time.without.routing"
val processingTimeMs = 100L
registry.verifyTimer(counterName) { timer ->
assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
}
- verifyCountersAndTimersAreUnchangedBut(
- counterName,
- "$PREFIX.messages.sent.topic",
- "$PREFIX.messages.sent",
- "$PREFIX.messages.latency")
+
+ verifyCountersAndTimersAreUnchangedBut(counterName)
}
}
}
})
+private fun vesMessageReceivedAt(receivedAt: Instant, sentAt: Instant): VesMessage {
+ val lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
+ val commonHeader = commonHeader(lastEpochMicrosec = lastEpochMicrosec)
+ return VesMessage(commonHeader,
+ wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
+}
+
private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
val commonHeader = commonHeader(domain)
return VesMessage(commonHeader,