Enhance wire protocol 89/58689/1
authorfkrzywka <filip.krzywka@nokia.com>
Tue, 3 Jul 2018 08:14:38 +0000 (10:14 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 11:41:04 +0000 (13:41 +0200)
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector

Change xNF Simulator to send end-of-transmission message
after sending all messages

Close ves-hv-collector stream after encountering EOT message

Remove duplicated file in project

Closes ONAP-391

Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601

16 files changed:
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt [moved from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt with 81% similarity]
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
hv-collector-xnf-simulator/sample-request.json [deleted file]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt

index d615848..ff99717 100644 (file)
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.boundary
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
index 3246cf5..ceae78c 100644 (file)
@@ -25,13 +25,18 @@ import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import reactor.core.publisher.SynchronousSink
+import java.util.function.BiConsumer
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,9 +55,10 @@ internal class VesHvCollector(
                 dataStream
                         .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
                         .concatMap(wireDecoder::decode)
+                        .handle(completeStreamOnEOT)
                         .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
-                        .filter(WireFrame::isValid)
-                        .map(WireFrame::payload)
+                        .filter(PayloadWireFrameMessage::isValid)
+                        .map(PayloadWireFrameMessage::payload)
                         .map(protobufDecoder::decode)
                         .filter(validator::isValid)
                         .flatMap(this::findRoute)
@@ -76,11 +82,22 @@ internal class VesHvCollector(
         return Flux.empty()
     }
 
-    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
-        wireChunkDecoder.release()
-    }
+    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
 
     companion object {
         private val logger = Logger(VesHvCollector::class)
+
+        private val completeStreamOnEOT by lazy {
+            BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
+                when (frame) {
+                    is EndOfTransmissionMessage -> {
+                        logger.info("Completing stream because of receiving EOT message")
+                        sink.complete()
+                    }
+                    is PayloadWireFrameMessage -> sink.next(frame)
+                    else -> sink.error(UnknownWireFrameTypeException(frame))
+                }
+            }
+        }
     }
 }
index 0426ceb..e998576 100644 (file)
@@ -75,7 +75,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
         onReadIdle(timeout.toMillis()) {
             logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
             context().channel().close().addListener {
-
                 if (it.isSuccess)
                     logger.debug { "Client disconnected because of idle timeout" }
                 else
index 502505c..fbff769 100644 (file)
@@ -22,11 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.*
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.SynchronousSink
@@ -44,7 +40,7 @@ internal class WireChunkDecoder(
         streamBuffer.release()
     }
 
-    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+    fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
         logIncomingMessage(byteBuf)
         if (byteBuf.readableBytes() == 0) {
             byteBuf.release()
@@ -55,13 +51,13 @@ internal class WireChunkDecoder(
         }
     }
 
-    private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+    private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
         decoder.decodeFirst(streamBuffer)
                 .fold(onError(next), onSuccess(next))
                 .unsafeRunSync()
     }
 
-    private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+    private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
         when (err) {
             is InvalidWireFrame -> IO {
                 next.error(WireFrameException(err))
@@ -73,20 +69,29 @@ internal class WireChunkDecoder(
         }
     }
 
-    private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame ->
-        IO {
-            logDecodedWireMessage(frame)
-            next.next(frame)
+    private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
+        when (frame) {
+            is PayloadWireFrameMessage -> IO {
+                logDecodedWireMessage(frame)
+                next.next(frame)
+            }
+            is EndOfTransmissionMessage -> IO {
+                logEndOfTransmissionWireMessage()
+                next.next(frame)
+            }
         }
     }
 
-
     private fun logIncomingMessage(wire: ByteBuf) {
         logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
     }
 
-    private fun logDecodedWireMessage(wire: WireFrame) {
-        logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+    private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+        logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+    }
+
+    private fun logEndOfTransmissionWireMessage() {
+        logger.trace { "Received end-of-transmission message" }
     }
 
     private fun logEndOfData() {
index 33f7168..a9364ed 100644 (file)
@@ -27,10 +27,13 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import reactor.test.test
+import kotlin.test.fail
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
@@ -43,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
 
     val encoder = WireFrameEncoder(alloc)
 
-    fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
+    fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
 
     fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
 
@@ -98,23 +101,23 @@ internal object WireChunkDecoderTest : Spek({
         }
 
         given("valid input") {
-            val input = WireFrame(samplePayload)
+            val input = PayloadWireFrameMessage(samplePayload)
 
             it("should yield decoded input frame") {
                 createInstance().decode(input).test()
-                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
                         .verifyComplete()
             }
         }
 
         given("valid input with part of next frame") {
             val input = Unpooled.buffer()
-                    .writeBytes(encoder.encode(WireFrame(samplePayload)))
-                    .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
+                    .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+                    .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
 
             it("should yield decoded input frame") {
                 createInstance().decode(input).test()
-                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
                         .verifyComplete()
             }
 
@@ -123,14 +126,30 @@ internal object WireChunkDecoderTest : Spek({
             }
         }
 
+        given("end-of-transmission marker byte with garbage after it") {
+            val input = Unpooled.buffer()
+                    .writeByte(0xAA)
+                    .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+            it("should yield decoded end-of-transmission frame and error") {
+                createInstance().decode(input).test()
+                        .expectNextMatches { it is EndOfTransmissionMessage }
+                        .verifyError(WireFrameException::class.java)
+            }
+
+            it("should leave memory unreleased") {
+                verifyMemoryNotReleased(input)
+            }
+        }
+
         given("valid input with garbage after it") {
             val input = Unpooled.buffer()
-                    .writeBytes(encoder.encode(WireFrame(samplePayload)))
+                    .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
                     .writeBytes(Unpooled.wrappedBuffer(samplePayload))
 
             it("should yield decoded input frame and error") {
                 createInstance().decode(input).test()
-                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
                         .verifyError(WireFrameException::class.java)
             }
 
@@ -140,16 +159,16 @@ internal object WireChunkDecoderTest : Spek({
         }
 
         given("two inputs containing two separate messages") {
-            val input1 = encoder.encode(WireFrame(samplePayload))
-            val input2 = encoder.encode(WireFrame(anotherPayload))
+            val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+            val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
 
             it("should yield decoded input frames") {
                 val cut = createInstance()
                 cut.decode(input1).test()
-                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
                         .verifyComplete()
                 cut.decode(input2).test()
-                        .expectNextMatches { it.payloadSize == anotherPayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
                         .verifyComplete()
             }
 
@@ -158,15 +177,57 @@ internal object WireChunkDecoderTest : Spek({
             }
         }
 
+        given("two payload messages followed by end-of-transmission marker byte") {
+            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+            val input = Unpooled.buffer()
+                    .writeBytes(frame1)
+                    .writeBytes(frame2)
+                    .writeByte(0xAA)
+
+            it("should yield decoded input frames") {
+                val cut = createInstance()
+                cut.decode(input).test()
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+                        .expectNextMatches { it is EndOfTransmissionMessage }
+                        .verifyComplete()
+            }
+        }
+
+        given("two payload messages separated by end-of-transmission marker byte") {
+            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+            val input = Unpooled.buffer()
+                    .writeBytes(frame1)
+                    .writeByte(0xAA)
+                    .writeBytes(frame2)
+
+            it("should yield decoded input frames") {
+                val cut = createInstance()
+                cut.decode(input).test()
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { it is EndOfTransmissionMessage }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+                        .verifyComplete()
+            }
+
+            it("should release memory") {
+                verifyMemoryReleased(input)
+            }
+        }
+
         given("1st input containing 1st frame and 2nd input containing garbage") {
-            val input1 = encoder.encode(WireFrame(samplePayload))
+            val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
             val input2 = Unpooled.wrappedBuffer(anotherPayload)
 
             it("should yield decoded input frames") {
                 val cut = createInstance()
                 cut.decode(input1)
                         .test()
-                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
                         .verifyComplete()
                 cut.decode(input2).test()
                         .verifyError(WireFrameException::class.java)
@@ -183,8 +244,8 @@ internal object WireChunkDecoderTest : Spek({
 
 
         given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
-            val frame1 = encoder.encode(WireFrame(samplePayload))
-            val frame2 = encoder.encode(WireFrame(anotherPayload))
+            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
 
             val input1 = Unpooled.buffer()
                     .writeBytes(frame1)
@@ -194,10 +255,10 @@ internal object WireChunkDecoderTest : Spek({
             it("should yield decoded input frames") {
                 val cut = createInstance()
                 cut.decode(input1).test()
-                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
                         .verifyComplete()
                 cut.decode(input2).test()
-                        .expectNextMatches { it.payloadSize == anotherPayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
                         .verifyComplete()
             }
 
@@ -207,8 +268,8 @@ internal object WireChunkDecoderTest : Spek({
         }
 
         given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
-            val frame1 = encoder.encode(WireFrame(samplePayload))
-            val frame2 = encoder.encode(WireFrame(anotherPayload))
+            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
 
             val input1 = Unpooled.buffer()
                     .writeBytes(frame1, 5)
@@ -221,8 +282,8 @@ internal object WireChunkDecoderTest : Spek({
                 cut.decode(input1).test()
                         .verifyComplete()
                 cut.decode(input2).test()
-                        .expectNextMatches { it.payloadSize == samplePayload.size }
-                        .expectNextMatches { it.payloadSize == anotherPayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
                         .verifyComplete()
             }
 
@@ -232,3 +293,15 @@ internal object WireChunkDecoderTest : Spek({
         }
     }
 })
+
+
+private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
+        if (msg is PayloadWireFrameMessage) {
+            msg
+        } else {
+            fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
+        }
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+        this as? EndOfTransmissionMessage
+                ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
\ No newline at end of file
index 246fc7e..5e6e666 100644 (file)
@@ -37,22 +37,42 @@ object VesHvSpecification : Spek({
 
     describe("VES High Volume Collector") {
         it("should handle multiple HV RAN events") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val (sut, sink) = vesHvWithStoringSink()
             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
 
             assertThat(messages)
                     .describedAs("should send all events")
                     .hasSize(2)
         }
+
+        it("should not handle messages received from client after end-of-transmission message") {
+            val (sut, sink) = vesHvWithStoringSink()
+            val validMessage = vesMessage(Domain.HVRANMEAS)
+            val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
+            val endOfTransmissionMessage = endOfTransmissionMessage()
+
+            val handledEvents = sut.handleConnection(sink,
+                    validMessage,
+                    endOfTransmissionMessage,
+                    anotherValidMessage
+            )
+
+            assertThat(handledEvents).hasSize(1)
+            assertThat(validMessage.refCnt())
+                    .describedAs("first message should be released")
+                    .isEqualTo(0)
+            assertThat(endOfTransmissionMessage.refCnt())
+                    .describedAs("end-of-transmission message should be released")
+                    .isEqualTo(0)
+            assertThat(anotherValidMessage.refCnt())
+                    .describedAs("second (not handled) message should not be released")
+                    .isEqualTo(1)
+        }
     }
 
     describe("Memory management") {
         it("should release memory for each handled and dropped message") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val (sut, sink) = vesHvWithStoringSink()
             val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
             val msgWithInvalidFrame = invalidWireFrame()
@@ -76,13 +96,30 @@ object VesHvSpecification : Spek({
             assertThat(msgWithTooBigPayload.refCnt())
                     .describedAs("message with payload exceeding 1MiB should be released")
                     .isEqualTo(expectedRefCnt)
+        }
 
+        it("should release memory for end-of-transmission message") {
+            val (sut, sink) = vesHvWithStoringSink()
+            val validMessage = vesMessage(Domain.HVRANMEAS)
+            val endOfTransmissionMessage = endOfTransmissionMessage()
+            val expectedRefCnt = 0
+
+            val handledEvents = sut.handleConnection(sink,
+                    validMessage,
+                    endOfTransmissionMessage
+            )
+
+            assertThat(handledEvents).hasSize(1)
+            assertThat(validMessage.refCnt())
+                    .describedAs("handled message should be released")
+                    .isEqualTo(expectedRefCnt)
+            assertThat(endOfTransmissionMessage.refCnt())
+                    .describedAs("end-of-transmission message should be released")
+                    .isEqualTo(expectedRefCnt)
         }
 
         it("should release memory for each message with invalid payload") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val (sut, sink) = vesHvWithStoringSink()
             val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithInvalidPayload = invalidVesMessage()
             val expectedRefCnt = 0
@@ -101,9 +138,7 @@ object VesHvSpecification : Spek({
         }
 
         it("should release memory for each message with garbage frame") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val (sut, sink) = vesHvWithStoringSink()
             val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithGarbageFrame = garbageFrame()
             val expectedRefCnt = 0
@@ -124,9 +159,7 @@ object VesHvSpecification : Spek({
 
     describe("message routing") {
         it("should direct message to a topic by means of routing configuration") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val (sut, sink) = vesHvWithStoringSink()
 
             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
@@ -137,8 +170,7 @@ object VesHvSpecification : Spek({
         }
 
         it("should be able to direct 2 messages from different domains to one topic") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
+            val (sut, sink) = vesHvWithStoringSink()
 
             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
 
@@ -149,20 +181,18 @@ object VesHvSpecification : Spek({
 
             assertThat(messages).describedAs("number of routed messages").hasSize(3)
 
-            assertThat(messages.get(0).topic).describedAs("first message topic")
+            assertThat(messages[0].topic).describedAs("first message topic")
                     .isEqualTo(HVRANMEAS_TOPIC)
 
-            assertThat(messages.get(1).topic).describedAs("second message topic")
+            assertThat(messages[1].topic).describedAs("second message topic")
                     .isEqualTo(HVRANMEAS_TOPIC)
 
-            assertThat(messages.get(2).topic).describedAs("last message topic")
+            assertThat(messages[2].topic).describedAs("last message topic")
                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
         }
 
         it("should drop message if route was not found") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val (sut, sink) = vesHvWithStoringSink()
             val messages = sut.handleConnection(sink,
                     vesMessage(Domain.OTHER, "first"),
                     vesMessage(Domain.HVRANMEAS, "second"),
@@ -181,8 +211,7 @@ object VesHvSpecification : Spek({
         val defaultTimeout = Duration.ofSeconds(10)
 
         it("should update collector on configuration change") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
+            val (sut, _) = vesHvWithStoringSink()
 
             sut.configurationProvider.updateConfiguration(basicConfiguration)
             val firstCollector = sut.collector
@@ -195,8 +224,7 @@ object VesHvSpecification : Spek({
         }
 
         it("should start routing messages on configuration change") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
+            val (sut, sink) = vesHvWithStoringSink()
 
             sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
 
@@ -216,8 +244,7 @@ object VesHvSpecification : Spek({
         }
 
         it("should change domain routing on configuration change") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
+            val (sut, sink) = vesHvWithStoringSink()
 
             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
@@ -244,8 +271,7 @@ object VesHvSpecification : Spek({
         }
 
         it("should update routing for each client sending one message") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
+            val (sut, sink) = vesHvWithStoringSink()
 
             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
@@ -274,8 +300,7 @@ object VesHvSpecification : Spek({
 
 
         it("should not update routing for client sending continuous stream of messages") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
+            val (sut, sink) = vesHvWithStoringSink()
 
             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
@@ -311,9 +336,7 @@ object VesHvSpecification : Spek({
 
     describe("request validation") {
         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
-            val sink = StoringSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val (sut, sink) = vesHvWithStoringSink()
 
             val handledMessages = sut.handleConnection(sink,
                     vesMessage(Domain.HVRANMEAS, "first"),
@@ -326,3 +349,10 @@ object VesHvSpecification : Spek({
     }
 
 })
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+    val sink = StoringSink()
+    val sut = Sut(sink)
+    sut.configurationProvider.updateConfiguration(basicConfiguration)
+    return Pair(sut, sink)
+}
index e620e6b..64b4ba2 100644 (file)
@@ -23,9 +23,7 @@ import com.google.protobuf.ByteString
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.*
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
-import org.onap.ves.HVRanMeasFieldsV5
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 import org.onap.ves.VesEventV5.VesEvent
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -33,15 +31,19 @@ import java.util.*
 
 val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
 
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
-    writeByte(0xFF) // always 0xFF
-    writeByte(0x01)   // version
-    writeByte(0x01)   // content type = GPB
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+        allocator.buffer().run {
+            writeByte(0xFF) // always 0xFF
+            writeByte(0x01)   // version
+            writeByte(0x01)   // content type = GPB
 
-    val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
-    writeInt(gpb.limit())  // ves event size in bytes
-    writeBytes(gpb)  // ves event as GPB bytes
-}
+            val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
+            writeInt(gpb.limit())  // ves event size in bytes
+            writeBytes(gpb)  // ves event as GPB bytes
+        }
+
+fun endOfTransmissionMessage(): ByteBuf =
+        allocator.buffer().writeByte(0xAA)
 
 
 fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
@@ -65,22 +67,25 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
     writeByte(0x01)   // content type = GPB
 }
 
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
-    writeByte(0xFF) // always 0xFF
-    writeByte(0x01)   // version
-    writeByte(0x01)   // content type = GPB
+fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+        allocator.buffer().run {
+            writeByte(0xFF) // always 0xFF
+            writeByte(0x01)   // version
+            writeByte(0x01)   // content type = GPB
 
-    val gpb = vesEvent(
-            domain,
-            id,
-            ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
-    ).toByteString().asReadOnlyByteBuffer()
+            val gpb = vesEvent(
+                    domain,
+                    id,
+                    ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+            ).toByteString().asReadOnlyByteBuffer()
 
-    writeInt(gpb.limit())  // ves event size in bytes
-    writeBytes(gpb)  // ves event as GPB bytes
-}
+            writeInt(gpb.limit())  // ves event size in bytes
+            writeBytes(gpb)  // ves event as GPB bytes
+        }
 
-fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) =
+fun vesEvent(domain: Domain = Domain.HVRANMEAS,
+             id: String = UUID.randomUUID().toString(),
+             hvRanMeasFields: ByteString = ByteString.EMPTY) =
         VesEvent.newBuilder()
                 .setCommonEventHeader(
                         CommonEventHeader.getDefaultInstance().toBuilder()
@@ -19,6 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.domain
 
+
+sealed class WireFrameMessage
+
 /**
  * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
  *
@@ -49,10 +52,11 @@ package org.onap.dcae.collectors.veshv.domain
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class WireFrame(val payload: ByteData,
-                     val version: Short,
-                     val payloadTypeRaw: Short,
-                     val payloadSize: Int) {
+data class PayloadWireFrameMessage(val payload: ByteData,
+                                   val version: Short,
+                                   val payloadTypeRaw: Short,
+                                   val payloadSize: Int
+) : WireFrameMessage() {
 
     constructor(payload: ByteArray) : this(
             ByteData(payload),
@@ -66,11 +70,38 @@ data class WireFrame(val payload: ByteData,
                     && payload.size() == payloadSize
 
     companion object {
+        const val MARKER_BYTE: Short = 0xFF
+
         const val SUPPORTED_VERSION: Short = 1
 
         const val HEADER_SIZE =
                 3 * java.lang.Byte.BYTES +
                         1 * java.lang.Integer.BYTES
-        const val MARKER_BYTE: Short = 0xFF
+
+        const val MAX_PAYLOAD_SIZE = 1024 * 1024
     }
 }
+
+
+/**
+ * This message type should be used by client to indicate that he has finished sending data to collector.
+ *
+ * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ *
+ * ```
+ *     ┌─────┬───────────────────────┐
+ *     │octet│           0           │
+ *     ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤
+ *     │ bit │ 0│  │  │  │  │  │  │  │
+ *     ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤
+ *     │field│          0xAA         │
+ *     └─────┴───────────────────────┘
+ * ```
+ *
+ * @since July 2018
+ */
+
+object EndOfTransmissionMessage : WireFrameMessage() {
+    const val MARKER_BYTE: Short = 0xAA
+}
+
index 39841d6..ab82dc0 100644 (file)
@@ -24,6 +24,7 @@ import arrow.core.Left
 import arrow.core.Right
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -31,10 +32,10 @@ import io.netty.buffer.ByteBufAllocator
  */
 class WireFrameEncoder(val allocator: ByteBufAllocator) {
 
-    fun encode(frame: WireFrame): ByteBuf {
-        val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+    fun encode(frame: PayloadWireFrameMessage): ByteBuf {
+        val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
 
-        bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+        bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
         bb.writeByte(frame.version.toInt())
         bb.writeByte(frame.payloadTypeRaw.toInt())
         bb.writeInt(frame.payloadSize)
@@ -50,32 +51,54 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) {
  */
 class WireFrameDecoder {
 
-    fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
+    fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
             when {
                 isEmpty(byteBuf) -> Left(EmptyWireFrame)
+                isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
                 headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
-                else -> parseFrame(byteBuf)
+                else -> parseWireFrame(byteBuf)
             }
 
-    private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
-
     private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
 
-    private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
+    private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
+
+    private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
+
+    private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
         byteBuf.markReaderIndex()
+        val byte = byteBuf.readUnsignedByte()
 
-        val mark = byteBuf.readUnsignedByte()
-        if (mark != WireFrame.MARKER_BYTE) {
+        return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
+            Right(EndOfTransmissionMessage)
+        } else {
             byteBuf.resetReaderIndex()
-            return Left(InvalidWireFrameMarker(mark))
+            Left(MissingWireFrameHeaderBytes)
+        }
+    }
+
+    private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
+        byteBuf.markReaderIndex()
+
+        val mark = byteBuf.readUnsignedByte()
+        return when (mark) {
+            EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
+            PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+            else -> {
+                byteBuf.resetReaderIndex()
+                Left(InvalidWireFrameMarker(mark))
+            }
         }
+    }
 
+    private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
         val version = byteBuf.readUnsignedByte()
         val payloadTypeRaw = byteBuf.readUnsignedByte()
 
         val payloadSize = byteBuf.readInt()
 
         if (payloadSize > MAX_PAYLOAD_SIZE) {
+            byteBuf.resetReaderIndex()
             return Left(PayloadSizeExceeded)
         }
 
@@ -86,10 +109,7 @@ class WireFrameDecoder {
 
         val payload = ByteData.readFrom(byteBuf, payloadSize)
 
-        return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
-    }
+        return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize))
 
-    companion object {
-        const val MAX_PAYLOAD_SIZE = 1024 * 1024
     }
 }
index 626bf32..d82bb25 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.domain
 
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,9 +33,10 @@ sealed class WireFrameDecodingError(val message: String)
 
 sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
 
-class InvalidWireFrameMarker(actualMarker: Short)
-    : InvalidWireFrame(
-        "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
+        "Invalid start of frame. Expected 0x%02X, but was 0x%02X"
+                .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+)
 
 object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
 
@@ -46,3 +47,9 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
 object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
 object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
 object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
+
+
+// Other
+
+class UnknownWireFrameTypeException(frame: WireFrameMessage)
+    : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
index 4d6f071..a5242e0 100644 (file)
 package org.onap.dcae.collectors.veshv.domain
 
 import arrow.core.Either
-import arrow.core.identity
 import io.netty.buffer.Unpooled
 import io.netty.buffer.UnpooledByteBufAllocator
 import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
 import org.assertj.core.api.ObjectAssert
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 import java.nio.charset.Charset
 import kotlin.test.assertTrue
+import kotlin.test.fail
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({
     val decoder = WireFrameDecoder()
 
     fun createSampleFrame() =
-            WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
+            PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
 
     fun encodeSampleFrame() =
             createSampleFrame().let {
@@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({
     describe("Wire Frame invariants") {
 
         given("input with unsupported version") {
-            val input = WireFrame(
+            val input = PayloadWireFrameMessage(
                     payload = ByteData.EMPTY,
                     version = 100,
                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({
         }
 
         given("input with unsupported payload type") {
-            val input = WireFrame(
+            val input = PayloadWireFrameMessage(
                     payload = ByteData.EMPTY,
                     version = 1,
                     payloadTypeRaw = 0x69,
@@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({
         }
 
         given("input with too small payload size") {
-            val input = WireFrame(
+            val input = PayloadWireFrameMessage(
                     payload = ByteData(byteArrayOf(1, 2, 3)),
                     version = 1,
                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({
         }
 
         given("input with too big payload size") {
-            val input = WireFrame(
+            val input = PayloadWireFrameMessage(
                     payload = ByteData(byteArrayOf(1, 2, 3)),
                     version = 1,
                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({
 
         given("valid input") {
             val payload = byteArrayOf(6, 9, 8, 6)
-            val input = WireFrame(
+            val input = PayloadWireFrameMessage(
                     payload = ByteData(payload),
                     version = 1,
                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({
         describe("encode-decode methods' compatibility") {
             val frame = createSampleFrame()
             val encoded = encodeSampleFrame()
-            val decoded = decoder.decodeFirst(encoded).getOrFail()
+            val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
 
             it("should decode version") {
                 assertThat(decoded.version).isEqualTo(frame.version)
@@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({
             }
         }
 
+
         describe("TCP framing") {
             // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
 
-            it("should decode message leaving rest unread") {
+            it("should return error when buffer is empty") {
+                val buff = Unpooled.buffer()
+
+                decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
+            }
+
+            it("should return end-of-transmission message when given end-of-transmission marker byte") {
                 val buff = Unpooled.buffer()
-                        .writeBytes(encodeSampleFrame())
                         .writeByte(0xAA)
-                val decoded = decoder.decodeFirst(buff).getOrFail()
 
-                assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
-                assertThat(buff.readableBytes()).isEqualTo(1)
+                assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
             }
 
-            it("should return error when not even header fits") {
+            it("should return error when given any single byte other than end-of-transmission marker byte") {
                 val buff = Unpooled.buffer()
-                        .writeByte(0xFF)
+                        .writeByte(0xEE)
 
                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+            }
 
+            it("should return error when payload message header does not fit") {
+                val buff = Unpooled.buffer()
+                        .writeByte(0xFF)
+                        .writeBytes("MOMOM".toByteArray())
+
+                decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
             }
 
-            it("should return error when first byte is not 0xFF but length looks ok") {
+            it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
                 val buff = Unpooled.buffer()
-                        .writeByte(0xAA)
+                        .writeByte(0x69)
                         .writeBytes("some garbage".toByteArray())
 
                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
             }
 
-            it("should return error when first byte is not 0xFF and length is to short") {
+            it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
                 val buff = Unpooled.buffer()
                         .writeByte(0xAA)
+                        .writeBytes("some garbage".toByteArray())
 
-                decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+                assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
             }
 
             it("should return error when payload doesn't fit") {
@@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({
                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
             }
 
+            it("should decode payload message leaving rest unread") {
+                val buff = Unpooled.buffer()
+                        .writeBytes(encodeSampleFrame())
+                        .writeByte(0xAA)
+                val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+
+                assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+                assertThat(buff.readableBytes()).isEqualTo(1)
+            }
         }
 
-        describe("payload size limit"){
+        describe("payload size limit") {
 
             it("should decode successfully when payload size is equal 1 MiB") {
 
                 val payload = ByteArray(MAX_PAYLOAD_SIZE)
-                val input = WireFrame(
+                val input = PayloadWireFrameMessage(
                         payload = ByteData(payload),
                         version = 1,
                         payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({
             it("should return error when payload exceeds 1 MiB") {
 
                 val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
-                val input = WireFrame(
+                val input = PayloadWireFrameMessage(
                         payload = ByteData(payload),
                         version = 1,
                         payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({
             it("should validate only first message") {
 
                 val payload = ByteArray(MAX_PAYLOAD_SIZE)
-                val input = WireFrame(
+                val input = PayloadWireFrameMessage(
                         payload = ByteData(payload),
                         version = 1,
                         payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
     fold({ assertj(assertThat(it)) }, { fail("Error expected") })
 }
 
-private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
-        fold({ fail(it.message) }, ::identity) as WireFrame
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
+        fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
+
+private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
+        this as? PayloadWireFrameMessage
+                ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
+
+
+private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
+    decoded.getEndOfTransmissionMessageOrFail()
+}
+
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+        fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+        this as? EndOfTransmissionMessage
+                ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
diff --git a/hv-collector-xnf-simulator/sample-request.json b/hv-collector-xnf-simulator/sample-request.json
deleted file mode 100644 (file)
index ca8bd88..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-{ 
-      "commonEventHeader": {
-                "version": "sample-version",
-                "domain": 10,
-                "sequence": 1,
-                "priority": 1,
-                "eventId": "sample-event-id",
-                "eventName": "sample-event-name",
-                "eventType": "sample-event-type",
-                "startEpochMicrosec": 120034455,
-                "lastEpochMicrosec": 120034455,
-                "nfNamingCode": "sample-nf-naming-code",
-                "nfcNamingCode": "sample-nfc-naming-code",
-                "reportingEntityId": "sample-reporting-entity-id",
-                "reportingEntityName": "sample-reporting-entity-name",
-                "sourceId": "sample-source-id",
-                "sourceName": "sample-source-name"
-        },
-        "messagesAmount": 25000
-}
index f4c92fd..a6d6af8 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.api
 
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
 import reactor.core.publisher.Flux
 
@@ -28,5 +28,5 @@ import reactor.core.publisher.Flux
  * @since June 2018
  */
 interface MessageGenerator {
-    fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame>
+    fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage>
 }
index b67bc64..6346b64 100644 (file)
@@ -20,7 +20,7 @@
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import ratpack.exec.Promise
@@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
                 }
     }
 
-    private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+    private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
         return ctx.request.body
                 .map { Json.createReader(it.inputStream).readObject() }
                 .map { extractMessageParameters(it) }
index 0d28bad..baff967 100644 (file)
@@ -20,7 +20,7 @@
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
 import org.onap.ves.VesEventV5.VesEvent
@@ -35,7 +35,7 @@ import javax.json.JsonObject
  */
 internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
 
-    override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+    override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> =
             Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
                 if (messageParameters.amount < 0)
                     it.repeat()
@@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat
             .build()
 
 
-    private fun createMessage(commonHeader: CommonEventHeader): WireFrame =
-            WireFrame(vesMessageBytes(commonHeader))
+    private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage =
+            PayloadWireFrameMessage(vesMessageBytes(commonHeader))
 
 
     private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
index 6487888..2f9e0b5 100644 (file)
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import arrow.effects.IO
-import io.netty.buffer.Unpooled
 import io.netty.handler.ssl.ClientAuth
 import io.netty.handler.ssl.SslContext
 import io.netty.handler.ssl.SslContextBuilder
 import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
             }
             .build()
 
-    fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+    fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
         sendRx(messages).block()
     }
 
-    fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+    fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
         val complete = ReplayProcessor.create<Void>(1)
         client
                 .newHandler { _, output -> handler(complete, messages, output) }
@@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
         return complete.then()
     }
 
-    private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+    private fun handler(complete: ReplayProcessor<Void>,
+                        messages: Flux<PayloadWireFrameMessage>,
+                        nettyOutbound: NettyOutbound):
             Publisher<Void> {
-        val encoder = WireFrameEncoder(nettyOutbound.alloc())
-        val context = nettyOutbound.context()
-
-        context.onClose {
-            logger.info { "Connection to ${context.address()} has been closed" }
-        }
-
-        // TODO: Close channel after all messages have been sent
-        // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
-//        complete.subscribe {
-//            context.channel().disconnect().addListener {
-//                if (it.isSuccess)
-//                    logger.info { "Connection closed" }
-//                else
-//                    logger.warn("Failed to close the connection", it.cause())
-//            }
-//        }
-
+        val allocator = nettyOutbound.alloc()
+        val encoder = WireFrameEncoder(allocator)
         val frames = messages
                 .map(encoder::encode)
                 .window(MAX_BATCH_SIZE)
 
         return nettyOutbound
+                .logConnectionClosed()
                 .options { it.flushOnBoundary() }
                 .sendGroups(frames)
-                .send(Mono.just(Unpooled.EMPTY_BUFFER))
+                .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
                 .then {
                     logger.info("Messages have been sent")
                     complete.onComplete()
@@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
                     .clientAuth(ClientAuth.REQUIRE)
                     .build()
 
+    private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+        context().onClose {
+            logger.info { "Connection to ${context().address()} has been closed" }
+        }
+        return this
+    }
+
     companion object {
         private const val MAX_BATCH_SIZE = 128
+        private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
         private val logger = Logger(XnfSimulator::class)
     }
 }