import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.publisher.SynchronousSink
+import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
dataStream
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireDecoder::decode)
+ .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- .filter(WireFrame::isValid)
- .map(WireFrame::payload)
+ .filter(PayloadWireFrameMessage::isValid)
+ .map(PayloadWireFrameMessage::payload)
.map(protobufDecoder::decode)
.filter(validator::isValid)
.flatMap(this::findRoute)
return Flux.empty()
}
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
- wireChunkDecoder.release()
- }
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
companion object {
private val logger = Logger(VesHvCollector::class)
+
+ private val completeStreamOnEOT by lazy {
+ BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
+ when (frame) {
+ is EndOfTransmissionMessage -> {
+ logger.info("Completing stream because of receiving EOT message")
+ sink.complete()
+ }
+ is PayloadWireFrameMessage -> sink.next(frame)
+ else -> sink.error(UnknownWireFrameTypeException(frame))
+ }
+ }
+ }
}
}
onReadIdle(timeout.toMillis()) {
logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
context().channel().close().addListener {
-
if (it.isSuccess)
logger.debug { "Client disconnected because of idle timeout" }
else
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.*
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
streamBuffer.release()
}
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+ fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
logIncomingMessage(byteBuf)
if (byteBuf.readableBytes() == 0) {
byteBuf.release()
}
}
- private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+ private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
.unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
when (err) {
is InvalidWireFrame -> IO {
next.error(WireFrameException(err))
}
}
- private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
+ when (frame) {
+ is PayloadWireFrameMessage -> IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ is EndOfTransmissionMessage -> IO {
+ logEndOfTransmissionWireMessage()
+ next.next(frame)
+ }
}
}
-
private fun logIncomingMessage(wire: ByteBuf) {
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
- private fun logDecodedWireMessage(wire: WireFrame) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+ private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+ logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ }
+
+ private fun logEndOfTransmissionWireMessage() {
+ logger.trace { "Received end-of-transmission message" }
}
private fun logEndOfData() {
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import reactor.test.test
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
val encoder = WireFrameEncoder(alloc)
- fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
+ fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
}
given("valid input") {
- val input = WireFrame(samplePayload)
+ val input = PayloadWireFrameMessage(samplePayload)
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
}
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
- .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
}
}
+ given("end-of-transmission marker byte with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeByte(0xAA)
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded end-of-transmission frame and error") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyError(WireFrameException::class.java)
}
}
given("two inputs containing two separate messages") {
- val input1 = encoder.encode(WireFrame(samplePayload))
- val input2 = encoder.encode(WireFrame(anotherPayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
}
}
+ given("two payload messages followed by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+ .writeByte(0xAA)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyComplete()
+ }
+ }
+
+ given("two payload messages separated by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeByte(0xAA)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = encoder.encode(WireFrame(samplePayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1)
.test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
.verifyError(WireFrameException::class.java)
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
}
}
})
+
+
+private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
+ if (msg is PayloadWireFrameMessage) {
+ msg
+ } else {
+ fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
+ }
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
\ No newline at end of file
describe("VES High Volume Collector") {
it("should handle multiple HV RAN events") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
assertThat(messages)
.describedAs("should send all events")
.hasSize(2)
}
+
+ it("should not handle messages received from client after end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage,
+ anotherValidMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("first message should be released")
+ .isEqualTo(0)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(0)
+ assertThat(anotherValidMessage.refCnt())
+ .describedAs("second (not handled) message should not be released")
+ .isEqualTo(1)
+ }
}
describe("Memory management") {
it("should release memory for each handled and dropped message") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
val msgWithInvalidFrame = invalidWireFrame()
assertThat(msgWithTooBigPayload.refCnt())
.describedAs("message with payload exceeding 1MiB should be released")
.isEqualTo(expectedRefCnt)
+ }
+ it("should release memory for end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(expectedRefCnt)
}
it("should release memory for each message with invalid payload") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidPayload = invalidVesMessage()
val expectedRefCnt = 0
}
it("should release memory for each message with garbage frame") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
describe("message routing") {
it("should direct message to a topic by means of routing configuration") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
}
it("should be able to direct 2 messages from different domains to one topic") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages.get(0).topic).describedAs("first message topic")
+ assertThat(messages[0].topic).describedAs("first message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(1).topic).describedAs("second message topic")
+ assertThat(messages[1].topic).describedAs("second message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(2).topic).describedAs("last message topic")
+ assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
}
it("should drop message if route was not found") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
vesMessage(Domain.OTHER, "first"),
vesMessage(Domain.HVRANMEAS, "second"),
val defaultTimeout = Duration.ofSeconds(10)
it("should update collector on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
val firstCollector = sut.collector
}
it("should start routing messages on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
}
it("should change domain routing on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
}
it("should update routing for each client sending one message") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
it("should not update routing for client sending continuous stream of messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
vesMessage(Domain.HVRANMEAS, "first"),
}
})
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ return Pair(sut, sink)
+}
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.*
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
-import org.onap.ves.HVRanMeasFieldsV5
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
+
+fun endOfTransmissionMessage(): ByteBuf =
+ allocator.buffer().writeByte(0xAA)
fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
writeByte(0x01) // content type = GPB
}
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(
- domain,
- id,
- ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
- ).toByteString().asReadOnlyByteBuffer()
+ val gpb = vesEvent(
+ domain,
+ id,
+ ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ ).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
-fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) =
+fun vesEvent(domain: Domain = Domain.HVRANMEAS,
+ id: String = UUID.randomUUID().toString(),
+ hvRanMeasFields: ByteString = ByteString.EMPTY) =
VesEvent.newBuilder()
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()
*/
package org.onap.dcae.collectors.veshv.domain
+
+sealed class WireFrameMessage
+
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
*
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class WireFrame(val payload: ByteData,
- val version: Short,
- val payloadTypeRaw: Short,
- val payloadSize: Int) {
+data class PayloadWireFrameMessage(val payload: ByteData,
+ val version: Short,
+ val payloadTypeRaw: Short,
+ val payloadSize: Int
+) : WireFrameMessage() {
constructor(payload: ByteArray) : this(
ByteData(payload),
&& payload.size() == payloadSize
companion object {
+ const val MARKER_BYTE: Short = 0xFF
+
const val SUPPORTED_VERSION: Short = 1
const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
1 * java.lang.Integer.BYTES
- const val MARKER_BYTE: Short = 0xFF
+
+ const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
+
+
+/**
+ * This message type should be used by client to indicate that he has finished sending data to collector.
+ *
+ * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ *
+ * ```
+ * ┌─────┬───────────────────────┐
+ * │octet│ 0 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * │ bit │ 0│ │ │ │ │ │ │ │
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ 0xAA │
+ * └─────┴───────────────────────┘
+ * ```
+ *
+ * @since July 2018
+ */
+
+object EndOfTransmissionMessage : WireFrameMessage() {
+ const val MARKER_BYTE: Short = 0xAA
+}
+
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
*/
class WireFrameEncoder(val allocator: ByteBufAllocator) {
- fun encode(frame: WireFrame): ByteBuf {
- val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+ fun encode(frame: PayloadWireFrameMessage): ByteBuf {
+ val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
- bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+ bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
bb.writeByte(frame.version.toInt())
bb.writeByte(frame.payloadTypeRaw.toInt())
bb.writeInt(frame.payloadSize)
*/
class WireFrameDecoder {
- fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
+ fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
+ isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
- else -> parseFrame(byteBuf)
+ else -> parseWireFrame(byteBuf)
}
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
-
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
+ private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
+
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
+
+ private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
byteBuf.markReaderIndex()
+ val byte = byteBuf.readUnsignedByte()
- val mark = byteBuf.readUnsignedByte()
- if (mark != WireFrame.MARKER_BYTE) {
+ return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
+ Right(EndOfTransmissionMessage)
+ } else {
byteBuf.resetReaderIndex()
- return Left(InvalidWireFrameMarker(mark))
+ Left(MissingWireFrameHeaderBytes)
+ }
+ }
+
+ private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
+ byteBuf.markReaderIndex()
+
+ val mark = byteBuf.readUnsignedByte()
+ return when (mark) {
+ EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
+ PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ else -> {
+ byteBuf.resetReaderIndex()
+ Left(InvalidWireFrameMarker(mark))
+ }
}
+ }
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
val version = byteBuf.readUnsignedByte()
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
if (payloadSize > MAX_PAYLOAD_SIZE) {
+ byteBuf.resetReaderIndex()
return Left(PayloadSizeExceeded)
}
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
- }
+ return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize))
- companion object {
- const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
-class InvalidWireFrameMarker(actualMarker: Short)
- : InvalidWireFrame(
- "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X"
+ .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+)
object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
+
+
+// Other
+
+class UnknownWireFrameTypeException(frame: WireFrameMessage)
+ : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
package org.onap.dcae.collectors.veshv.domain
import arrow.core.Either
-import arrow.core.identity
import io.netty.buffer.Unpooled
import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
val decoder = WireFrameDecoder()
fun createSampleFrame() =
- WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
+ PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
describe("Wire Frame invariants") {
given("input with unsupported version") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 100,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
}
given("input with unsupported payload type") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 1,
payloadTypeRaw = 0x69,
}
given("input with too small payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
}
given("input with too big payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded).getOrFail()
+ val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
it("should decode version") {
assertThat(decoded.version).isEqualTo(frame.version)
}
}
+
describe("TCP framing") {
// see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
- it("should decode message leaving rest unread") {
+ it("should return error when buffer is empty") {
+ val buff = Unpooled.buffer()
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
+ }
+
+ it("should return end-of-transmission message when given end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff).getOrFail()
- assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
- assertThat(buff.readableBytes()).isEqualTo(1)
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
- it("should return error when not even header fits") {
+ it("should return error when given any single byte other than end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeByte(0xFF)
+ .writeByte(0xEE)
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ }
+ it("should return error when payload message header does not fit") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+ .writeBytes("MOMOM".toByteArray())
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
}
- it("should return error when first byte is not 0xFF but length looks ok") {
+ it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
val buff = Unpooled.buffer()
- .writeByte(0xAA)
+ .writeByte(0x69)
.writeBytes("some garbage".toByteArray())
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
}
- it("should return error when first byte is not 0xFF and length is to short") {
+ it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
+ .writeBytes("some garbage".toByteArray())
- decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
it("should return error when payload doesn't fit") {
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
}
+ it("should decode payload message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAA)
+ val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
}
- describe("payload size limit"){
+ describe("payload size limit") {
it("should decode successfully when payload size is equal 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
it("should return error when payload exceeds 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
it("should validate only first message") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
fold({ assertj(assertThat(it)) }, { fail("Error expected") })
}
-private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
- fold({ fail(it.message) }, ::identity) as WireFrame
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
+ fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
+
+private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
+ this as? PayloadWireFrameMessage
+ ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
+
+
+private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
+ decoded.getEndOfTransmissionMessageOrFail()
+}
+
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
+++ /dev/null
-{
- "commonEventHeader": {
- "version": "sample-version",
- "domain": 10,
- "sequence": 1,
- "priority": 1,
- "eventId": "sample-event-id",
- "eventName": "sample-event-name",
- "eventType": "sample-event-type",
- "startEpochMicrosec": 120034455,
- "lastEpochMicrosec": 120034455,
- "nfNamingCode": "sample-nf-naming-code",
- "nfcNamingCode": "sample-nfc-naming-code",
- "reportingEntityId": "sample-reporting-entity-id",
- "reportingEntityName": "sample-reporting-entity-name",
- "sourceId": "sample-source-id",
- "sourceName": "sample-source-name"
- },
- "messagesAmount": 25000
-}
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.api
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import reactor.core.publisher.Flux
* @since June 2018
*/
interface MessageGenerator {
- fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame>
+ fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage>
}
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import ratpack.exec.Promise
}
}
- private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+ private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
return ctx.request.body
.map { Json.createReader(it.inputStream).readObject() }
.map { extractMessageParameters(it) }
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.ves.VesEventV5.VesEvent
*/
internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
- override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+ override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> =
Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
if (messageParameters.amount < 0)
it.repeat()
.build()
- private fun createMessage(commonHeader: CommonEventHeader): WireFrame =
- WireFrame(vesMessageBytes(commonHeader))
+ private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage =
+ PayloadWireFrameMessage(vesMessageBytes(commonHeader))
private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
}
.build()
- fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+ fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
sendRx(messages).block()
}
- fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+ fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
.newHandler { _, output -> handler(complete, messages, output) }
return complete.then()
}
- private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<PayloadWireFrameMessage>,
+ nettyOutbound: NettyOutbound):
Publisher<Void> {
- val encoder = WireFrameEncoder(nettyOutbound.alloc())
- val context = nettyOutbound.context()
-
- context.onClose {
- logger.info { "Connection to ${context.address()} has been closed" }
- }
-
- // TODO: Close channel after all messages have been sent
- // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
-// complete.subscribe {
-// context.channel().disconnect().addListener {
-// if (it.isSuccess)
-// logger.info { "Connection closed" }
-// else
-// logger.warn("Failed to close the connection", it.cause())
-// }
-// }
-
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
val frames = messages
.map(encoder::encode)
.window(MAX_BATCH_SIZE)
return nettyOutbound
+ .logConnectionClosed()
.options { it.flushOnBoundary() }
.sendGroups(frames)
- .send(Mono.just(Unpooled.EMPTY_BUFFER))
+ .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
.then {
logger.info("Messages have been sent")
complete.onComplete()
.clientAuth(ClientAuth.REQUIRE)
.build()
+ private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+ context().onClose {
+ logger.info { "Connection to ${context().address()} has been closed" }
+ }
+ return this
+ }
+
companion object {
private const val MAX_BATCH_SIZE = 128
+ private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
private val logger = Logger(XnfSimulator::class)
}
}