Remove end-of-transmission message from protocol 67/68367/2
authorFilip Krzywka <filip.krzywka@nokia.com>
Fri, 21 Sep 2018 08:14:03 +0000 (10:14 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Mon, 24 Sep 2018 06:22:29 +0000 (08:22 +0200)
Also update protobuf files definitions to latest version.

Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea
Issue-ID: DCAEGEN2-775
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
22 files changed:
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
hv-collector-domain/src/main/proto/event/VesEvent.proto
hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt

index f608a2b..8970e03 100644 (file)
@@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import reactor.core.publisher.SynchronousSink
-import java.util.function.BiConsumer
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -53,7 +48,7 @@ internal class VesHvCollector(
             wireChunkDecoderSupplier(alloc).let { wireDecoder ->
                 dataStream
                         .transform { decodeWireFrame(it, wireDecoder) }
-                        .filter(PayloadWireFrameMessage::isValid)
+                        .filter(WireFrameMessage::isValid)
                         .transform(::decodePayload)
                         .filter(VesMessage::isValid)
                         .transform(::routeMessage)
@@ -62,14 +57,13 @@ internal class VesHvCollector(
                         .then()
             }
 
-    private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+    private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
             .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
             .concatMap(decoder::decode)
-            .handle(completeStreamOnEOT)
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
-    private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
-            .map(PayloadWireFrameMessage::payload)
+    private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+            .map(WireFrameMessage::payload)
             .map(protobufDecoder::decode)
             .flatMap { omitWhenNone(it) }
 
@@ -95,18 +89,5 @@ internal class VesHvCollector(
 
     companion object {
         private val logger = Logger(VesHvCollector::class)
-
-        private val completeStreamOnEOT by lazy {
-            BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
-                when (frame) {
-                    is EndOfTransmissionMessage -> {
-                        logger.info("Completing stream because of receiving EOT message")
-                        sink.complete()
-                    }
-                    is PayloadWireFrameMessage -> sink.next(frame)
-                    else -> sink.error(UnknownWireFrameTypeException(frame))
-                }
-            }
-        }
     }
 }
index 80f62d1..0775c65 100644 (file)
@@ -27,8 +27,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.SynchronousSink
@@ -76,15 +74,9 @@ internal class WireChunkDecoder(
     }
 
     private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
-        when (frame) {
-            is PayloadWireFrameMessage -> IO {
-                logDecodedWireMessage(frame)
-                next.next(frame)
-            }
-            is EndOfTransmissionMessage -> IO {
-                logEndOfTransmissionWireMessage()
-                next.next(frame)
-            }
+        IO {
+            logDecodedWireMessage(frame)
+            next.next(frame)
         }
     }
 
@@ -92,14 +84,10 @@ internal class WireChunkDecoder(
         logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
     }
 
-    private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+    private fun logDecodedWireMessage(wire: WireFrameMessage) {
         logger.trace { "Wire payload size: ${wire.payloadSize} B" }
     }
 
-    private fun logEndOfTransmissionWireMessage() {
-        logger.trace { "Received end-of-transmission message" }
-    }
-
     private fun logEndOfData() {
         logger.trace { "End of data in current TCP buffer" }
     }
index a9364ed..d214ffc 100644 (file)
@@ -27,13 +27,10 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import reactor.test.test
-import kotlin.test.fail
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
@@ -46,7 +43,7 @@ internal object WireChunkDecoderTest : Spek({
 
     val encoder = WireFrameEncoder(alloc)
 
-    fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
+    fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
 
     fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
 
@@ -101,23 +98,23 @@ internal object WireChunkDecoderTest : Spek({
         }
 
         given("valid input") {
-            val input = PayloadWireFrameMessage(samplePayload)
+            val input = WireFrameMessage(samplePayload)
 
             it("should yield decoded input frame") {
                 createInstance().decode(input).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
             }
         }
 
         given("valid input with part of next frame") {
             val input = Unpooled.buffer()
-                    .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
-                    .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
+                    .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
+                    .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
 
             it("should yield decoded input frame") {
                 createInstance().decode(input).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
             }
 
@@ -126,30 +123,14 @@ internal object WireChunkDecoderTest : Spek({
             }
         }
 
-        given("end-of-transmission marker byte with garbage after it") {
-            val input = Unpooled.buffer()
-                    .writeByte(0xAA)
-                    .writeBytes(Unpooled.wrappedBuffer(samplePayload))
-
-            it("should yield decoded end-of-transmission frame and error") {
-                createInstance().decode(input).test()
-                        .expectNextMatches { it is EndOfTransmissionMessage }
-                        .verifyError(WireFrameException::class.java)
-            }
-
-            it("should leave memory unreleased") {
-                verifyMemoryNotReleased(input)
-            }
-        }
-
         given("valid input with garbage after it") {
             val input = Unpooled.buffer()
-                    .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+                    .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
                     .writeBytes(Unpooled.wrappedBuffer(samplePayload))
 
             it("should yield decoded input frame and error") {
                 createInstance().decode(input).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyError(WireFrameException::class.java)
             }
 
@@ -159,16 +140,16 @@ internal object WireChunkDecoderTest : Spek({
         }
 
         given("two inputs containing two separate messages") {
-            val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
-            val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+            val input1 = encoder.encode(WireFrameMessage(samplePayload))
+            val input2 = encoder.encode(WireFrameMessage(anotherPayload))
 
             it("should yield decoded input frames") {
                 val cut = createInstance()
                 cut.decode(input1).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
                 cut.decode(input2).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+                        .expectNextMatches { it.payloadSize == anotherPayload.size }
                         .verifyComplete()
             }
 
@@ -177,57 +158,15 @@ internal object WireChunkDecoderTest : Spek({
             }
         }
 
-        given("two payload messages followed by end-of-transmission marker byte") {
-            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
-            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
-
-            val input = Unpooled.buffer()
-                    .writeBytes(frame1)
-                    .writeBytes(frame2)
-                    .writeByte(0xAA)
-
-            it("should yield decoded input frames") {
-                val cut = createInstance()
-                cut.decode(input).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
-                        .expectNextMatches { it is EndOfTransmissionMessage }
-                        .verifyComplete()
-            }
-        }
-
-        given("two payload messages separated by end-of-transmission marker byte") {
-            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
-            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
-
-            val input = Unpooled.buffer()
-                    .writeBytes(frame1)
-                    .writeByte(0xAA)
-                    .writeBytes(frame2)
-
-            it("should yield decoded input frames") {
-                val cut = createInstance()
-                cut.decode(input).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
-                        .expectNextMatches { it is EndOfTransmissionMessage }
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
-                        .verifyComplete()
-            }
-
-            it("should release memory") {
-                verifyMemoryReleased(input)
-            }
-        }
-
         given("1st input containing 1st frame and 2nd input containing garbage") {
-            val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+            val input1 = encoder.encode(WireFrameMessage(samplePayload))
             val input2 = Unpooled.wrappedBuffer(anotherPayload)
 
             it("should yield decoded input frames") {
                 val cut = createInstance()
                 cut.decode(input1)
                         .test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
                 cut.decode(input2).test()
                         .verifyError(WireFrameException::class.java)
@@ -244,8 +183,8 @@ internal object WireChunkDecoderTest : Spek({
 
 
         given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
-            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
-            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+            val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+            val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
 
             val input1 = Unpooled.buffer()
                     .writeBytes(frame1)
@@ -255,10 +194,10 @@ internal object WireChunkDecoderTest : Spek({
             it("should yield decoded input frames") {
                 val cut = createInstance()
                 cut.decode(input1).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
                 cut.decode(input2).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+                        .expectNextMatches { it.payloadSize == anotherPayload.size }
                         .verifyComplete()
             }
 
@@ -268,8 +207,8 @@ internal object WireChunkDecoderTest : Spek({
         }
 
         given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
-            val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
-            val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+            val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+            val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
 
             val input1 = Unpooled.buffer()
                     .writeBytes(frame1, 5)
@@ -282,8 +221,8 @@ internal object WireChunkDecoderTest : Spek({
                 cut.decode(input1).test()
                         .verifyComplete()
                 cut.decode(input2).test()
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
-                        .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == anotherPayload.size }
                         .verifyComplete()
             }
 
@@ -292,16 +231,4 @@ internal object WireChunkDecoderTest : Spek({
             }
         }
     }
-})
-
-
-private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
-        if (msg is PayloadWireFrameMessage) {
-            msg
-        } else {
-            fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
-        }
-
-private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
-        this as? EndOfTransmissionMessage
-                ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
\ No newline at end of file
+})
\ No newline at end of file
index 60e10ee..3eba9b6 100644 (file)
@@ -37,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
@@ -66,30 +65,6 @@ object VesHvSpecification : Spek({
                     .describedAs("should send all events")
                     .hasSize(2)
         }
-
-        it("should not handle messages received from client after end-of-transmission message") {
-            val (sut, sink) = vesHvWithStoringSink()
-            val validMessage = vesWireFrameMessage(HVMEAS)
-            val anotherValidMessage = vesWireFrameMessage(HVMEAS)
-            val endOfTransmissionMessage = endOfTransmissionWireMessage()
-
-            val handledEvents = sut.handleConnection(sink,
-                    validMessage,
-                    endOfTransmissionMessage,
-                    anotherValidMessage
-            )
-
-            assertThat(handledEvents).hasSize(1)
-            assertThat(validMessage.refCnt())
-                    .describedAs("first message should be released")
-                    .isEqualTo(0)
-            assertThat(endOfTransmissionMessage.refCnt())
-                    .describedAs("end-of-transmission message should be released")
-                    .isEqualTo(0)
-            assertThat(anotherValidMessage.refCnt())
-                    .describedAs("second (not handled) message should not be released")
-                    .isEqualTo(1)
-        }
     }
 
     describe("Memory management") {
@@ -116,26 +91,6 @@ object VesHvSpecification : Spek({
                     .isEqualTo(expectedRefCnt)
         }
 
-        it("should release memory for end-of-transmission message") {
-            val (sut, sink) = vesHvWithStoringSink()
-            val validMessage = vesWireFrameMessage(HVMEAS)
-            val endOfTransmissionMessage = endOfTransmissionWireMessage()
-            val expectedRefCnt = 0
-
-            val handledEvents = sut.handleConnection(sink,
-                    validMessage,
-                    endOfTransmissionMessage
-            )
-
-            assertThat(handledEvents).hasSize(1)
-            assertThat(validMessage.refCnt())
-                    .describedAs("handled message should be released")
-                    .isEqualTo(expectedRefCnt)
-            assertThat(endOfTransmissionMessage.refCnt())
-                    .describedAs("end-of-transmission message should be released")
-                    .isEqualTo(expectedRefCnt)
-        }
-
         it("should release memory for each message with invalid payload") {
             val (sut, sink) = vesHvWithStoringSink()
             val validMessage = vesWireFrameMessage(HVMEAS)
index 51f94cc..38de537 100644 (file)
@@ -24,7 +24,7 @@ import arrow.effects.fix
 import arrow.effects.monadError
 import arrow.typeclasses.bindingCatch
 import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.utils.arrow.asIo
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
@@ -77,7 +77,7 @@ class MessageStreamValidation(
 
     private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
             messageGenerator.createMessageFlux(parameters)
-                    .map(PayloadWireFrameMessage::payload)
+                    .map(WireFrameMessage::payload)
                     .map(ByteData::unsafeAsArray)
                     .map(VesEventOuterClass.VesEvent::parseFrom)
                     .collectList()
index 017360b..34ec8f5 100644 (file)
@@ -179,6 +179,6 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
     return VesEvent.newBuilder()
             .setCommonEventHeader(CommonEventHeader.newBuilder()
                     .setEventId(eventId))
-            .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
+            .setEventFields(ByteString.copyFrom(payload.toByteArray()))
             .build()
 }
index beef26b..05fdd80 100644 (file)
@@ -31,7 +31,7 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.mockito.ArgumentMatchers.anyList
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
@@ -89,7 +89,7 @@ internal class MessageStreamValidationTest : Spek({
                 // given
                 val jsonAsStream = sampleJsonAsStream()
                 val event = vesEvent()
-                val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+                val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
                 val receivedMessageBytes = event.toByteArray()
 
                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
@@ -107,7 +107,7 @@ internal class MessageStreamValidationTest : Spek({
                 val jsonAsStream = sampleJsonAsStream()
                 val generatedEvent = vesEvent(payload = "payload A")
                 val receivedEvent = vesEvent(payload = "payload B")
-                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
                 val receivedMessageBytes = receivedEvent.toByteArray()
 
                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
@@ -125,7 +125,7 @@ internal class MessageStreamValidationTest : Spek({
                 val jsonAsStream = sampleJsonAsStream()
                 val generatedEvent = vesEvent()
                 val receivedEvent = vesEvent(eventId = "bbb")
-                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
                 val receivedMessageBytes = receivedEvent.toByteArray()
 
                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
@@ -144,7 +144,7 @@ internal class MessageStreamValidationTest : Spek({
                 // given
                 val jsonAsStream = sampleJsonAsStream()
                 val event = vesEvent()
-                val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+                val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
                 val receivedMessageBytes = event.toByteArray()
 
                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
@@ -162,7 +162,7 @@ internal class MessageStreamValidationTest : Spek({
                 val jsonAsStream = sampleJsonAsStream()
                 val generatedEvent = vesEvent(payload = "payload A")
                 val receivedEvent = vesEvent(payload = "payload B")
-                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
                 val receivedMessageBytes = receivedEvent.toByteArray()
 
                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
@@ -180,7 +180,7 @@ internal class MessageStreamValidationTest : Spek({
                 val jsonAsStream = sampleJsonAsStream()
                 val generatedEvent = vesEvent()
                 val receivedEvent = vesEvent("bbb")
-                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
                 val receivedMessageBytes = receivedEvent.toByteArray()
 
                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
@@ -205,7 +205,7 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
     return VesEvent.newBuilder()
             .setCommonEventHeader(CommonEventHeader.newBuilder()
                     .setEventId(eventId))
-            .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
+            .setEventFields(ByteString.copyFrom(payload.toByteArray()))
             .build()
 }
 
index c61ab26..4f867f1 100644 (file)
@@ -24,8 +24,8 @@ import arrow.core.Left
 import arrow.core.Right
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R
  */
 class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
 
-    fun encode(frame: PayloadWireFrameMessage): ByteBuf {
-        val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
-
-        bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
-        bb.writeByte(frame.versionMajor.toInt())
-        bb.writeByte(frame.versionMinor.toInt())
-        bb.writeZero(RESERVED_BYTE_COUNT)
-        bb.writeByte(frame.payloadTypeRaw.toInt())
-        bb.writeInt(frame.payloadSize)
-        frame.payload.writeTo(bb)
-
-        return bb
-    }
+    fun encode(frame: WireFrameMessage): ByteBuf = allocator
+            .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size())
+            .run {
+                writeByte(WireFrameMessage.MARKER_BYTE.toInt())
+                writeByte(frame.versionMajor.toInt())
+                writeByte(frame.versionMinor.toInt())
+                writeZero(RESERVED_BYTE_COUNT)
+                writeByte(frame.payloadType.toInt())
+                writeInt(frame.payloadSize)
+            }
+            .also {
+                frame.payload.writeTo(it)
+            }
 }
 
 /**
@@ -57,36 +57,20 @@ class WireFrameDecoder {
     fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
             when {
                 isEmpty(byteBuf) -> Left(EmptyWireFrame)
-                isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
                 headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
                 else -> parseWireFrame(byteBuf)
             }
 
     private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
 
-    private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
-
-    private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
-
-    private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
-        byteBuf.markReaderIndex()
-        val byte = byteBuf.readUnsignedByte()
-
-        return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
-            Right(EndOfTransmissionMessage)
-        } else {
-            byteBuf.resetReaderIndex()
-            Left(MissingWireFrameHeaderBytes)
-        }
-    }
+    private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE
 
     private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
         byteBuf.markReaderIndex()
 
         val mark = byteBuf.readUnsignedByte()
         return when (mark) {
-            EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
-            PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+            WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
             else -> {
                 byteBuf.resetReaderIndex()
                 Left(InvalidWireFrameMarker(mark))
@@ -94,7 +78,7 @@ class WireFrameDecoder {
         }
     }
 
-    private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
+    private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
         val versionMajor = byteBuf.readUnsignedByte()
         val versionMinor = byteBuf.readUnsignedByte()
         byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
@@ -113,7 +97,7 @@ class WireFrameDecoder {
 
         val payload = ByteData.readFrom(byteBuf, payloadSize)
 
-        return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
+        return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
 
     }
 }
index d82bb25..dfadc5b 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.domain
 
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
 
 class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
         "Invalid start of frame. Expected 0x%02X, but was 0x%02X"
-                .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+                .format(WireFrameMessage.MARKER_BYTE, actualMarker)
 )
 
 object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
@@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
 object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
 object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
 object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
-
-
-// Other
-
-class UnknownWireFrameTypeException(frame: WireFrameMessage)
-    : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
index 642179e..06ca938 100644 (file)
 package org.onap.dcae.collectors.veshv.domain
 
 
-sealed class WireFrameMessage
-
 /**
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian).
  *
  * ```
  * -- Precedes every HV-VES message
@@ -31,21 +29,22 @@ sealed class WireFrameMessage
  *   magic           INTEGER (0..255),         â€“ always 0xFF, identifies extended header usage
  *   versionMajor    INTEGER (0..255),         â€“ major interface v, forward incompatible with previous major v
  *   versionMinor    INTEGER (0..255),         â€“ minor interface v, forward compatible with previous minor v
- *   reserved        BIT STRING (SIZE (16)),   â€“ reserved for future use
- *   messageType     INTEGER (0..255),         â€“ message payload type: 0x00=undefined, 0x01=protobuf
- *   messageLength   INTEGER (0..4294967295)   â€“ message payload length
+ *   reserved        OCTET STRING (SIZE (3)),  â€“ reserved for future use
+ *   payloadId       INTEGER (0..255),         â€“ message payload type: 0x00=undefined, 0x01=protobuf
+ *   payloadLength   INTEGER (0..4294967295)   â€“ message payload length
+ *   payload         OCTET STRING              â€“ length as per payloadLength
  * }
  * ```
  *
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class PayloadWireFrameMessage(val payload: ByteData,
-                                   val versionMajor: Short,
-                                   val versionMinor: Short,
-                                   val payloadTypeRaw: Short,
-                                   val payloadSize: Int
-) : WireFrameMessage() {
+data class WireFrameMessage(val payload: ByteData,
+                            val versionMajor: Short,
+                            val versionMinor: Short,
+                            val payloadType: Short,
+                            val payloadSize: Int
+) {
     constructor(payload: ByteArray) : this(
             ByteData(payload),
             SUPPORTED_VERSION_MAJOR,
@@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData,
 
     fun isValid(): Boolean =
             versionMajor == SUPPORTED_VERSION_MAJOR
-                    && PayloadContentType.isValidHexValue(payloadTypeRaw)
+                    && PayloadContentType.isValidHexValue(payloadType)
                     && payload.size() == payloadSize
 
     companion object {
@@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData,
         const val MAX_PAYLOAD_SIZE = 1024 * 1024
     }
 }
-
-
-/**
- * This message type should be used by client to indicate that he has finished sending data to collector.
- *
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
- *
- * ```
- * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination
- * Eot ::= SEQUENCE {
- *   magic           INTEGER (0..255),           â€“ always 0xAA
- * }
- * ```
- *
- * @since July 2018
- */
-
-object EndOfTransmissionMessage : WireFrameMessage() {
-    const val MARKER_BYTE: Short = 0xAA
-}
-
index 54a6d14..0f9e5e1 100644 (file)
@@ -21,54 +21,55 @@ syntax = "proto3";
 package org.onap.ves;
 
 message VesEvent {
-    CommonEventHeader commonEventHeader = 1; // required
+    CommonEventHeader commonEventHeader=1;  // required
 
-    oneof eventFields // required, payload
-    {
-        // each new high-volume domain can add an entry for its own GPB message
-        // the field can be opaque (bytes) to allow decoding the payload in a separate step
-        bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields
-    }
+    bytes eventFields=2;                    // required, payload
+    // this field contains a domain-specific GPB message
+    // the field being opaque (bytes), the decoding of the payload occurs in a separate step
+    // the name of the GPB message for domain XYZ is XYZFields
+    // e.g. for domain==HVMEAS, the GPB message is HVMEASFields
 }
 
 // VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
-// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain
+// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain.
 
-message CommonEventHeader {
-    string version = 1;               // required, "version of the gpb common event header"
-    string domain = 2;                // required, "the eventing domain associated with the event", allowed values:
-                                      // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
-                                      // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
+message CommonEventHeader
+{
+    string version = 1;                     // required, "version of the gpb common event header"
+    string domain = 2;                      // required, "the eventing domain associated with the event", allowed values:
+    // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
+    // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
 
-    uint32 sequence = 3;              // required, "ordering of events communicated by an event source instance or 0 if not needed"
+    uint32 sequence = 3;                    // required, "ordering of events communicated by an event source instance or 0 if not needed"
 
-    enum Priority {
+    enum Priority
+    {
         PRIORITY_NOT_PROVIDED = 0;
         HIGH = 1;
         MEDIUM = 2;
         NORMAL = 3;
         LOW = 4;
     }
-    Priority priority = 4;            // required, "processing priority"
+    Priority priority = 4;                  // required, "processing priority"
 
-    string eventId = 5;               // required, "event key that is unique to the event source"
-    string eventName = 6;             // required, "unique event name"
-    string eventType = 7;             // "for example - guest05,  platform"
+    string eventId = 5;                     // required, "event key that is unique to the event source"
+    string eventName = 6;                   // required, "unique event name"
+    string eventType = 7;                   // "for example - guest05,  platform"
 
-    uint64 lastEpochMicrosec = 8;     // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
-    uint64 startEpochMicrosec = 9;    // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+    uint64 lastEpochMicrosec = 8;           // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+    uint64 startEpochMicrosec = 9;          // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
 
-    string nfNamingCode = 10;         // "4 character network function type, aligned with vnf naming standards"
-    string nfcNamingCode = 11;        // "3 character network function component type, aligned with vfc naming standards"
-    string nfVendorName = 12;         // " Vendor Name providing the nf "
+    string nfNamingCode = 10;               // "4 character network function type, aligned with vnf naming standards"
+    string nfcNamingCode = 11;              // "3 character network function component type, aligned with vfc naming standards"
+    string nfVendorName = 12;               // " Vendor Name providing the nf "
 
-    bytes reportingEntityId = 13;     // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
-    string reportingEntityName = 14;  // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
-    bytes sourceId = 15;              // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
-    string sourceName = 16;           // required, "name of the entity experiencing the event issued use A&AI entry"
-    string timeZoneOffset = 17;       // "Offset to GMT to indicate local time zone for the device"
-    string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
+    bytes reportingEntityId = 13;           // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
+    string reportingEntityName = 14;        // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
+    bytes sourceId = 15;                    // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
+    string sourceName = 16;                 // required, "name of the entity experiencing the event issued use A&AI entry"
+    string timeZoneOffset = 17;             // "Offset to GMT to indicate local time zone for the device"
+    string vesEventListenerVersion = 18;    // required, "Version of the VesEvent Listener"
 
-    reserved "InternalHeaderFields";  // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
+    reserved "InternalHeaderFields";        // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
     reserved 100;
 }
index 9a8582d..94b4010 100644 (file)
@@ -24,20 +24,14 @@ import "MeasDataCollection.proto";          // for 3GPP PM format
 message HVMeasFields\r
 {\r
     string hvMeasFieldsVersion = 1;\r
-    measDataCollection.MeasDataCollection measDataCollection = 2;\r
-    // From 3GPP TS 28.550\r
+    MeasDataCollection measDataCollection = 2;\r
+    // Based on 3GPP TS 28.550\r
     // Informative: mapping between similar header fields (format may be different)\r
-    // 3GPP MeasStreamHeader   ONAP/VES CommonEventHeader\r
+    // 3GPP MeasHeader         ONAP/VES CommonEventHeader\r
     // senderName              sourceName\r
     // senderType              nfNamingCode + nfcNamingCode\r
     // vendorName              nfVendorName\r
     // collectionBeginTime     startEpochMicrosec\r
     // timestamp               lastEpochMicrosec\r
-    repeated HashMap eventAddlFlds = 3;     // optional per-event data\r
+    map<string, string> eventAddlFlds = 3;     // optional per-event data (name/value HashMap)\r
 }\r
-\r
-message HashMap\r
-{\r
-    string name = 1;\r
-    string value = 2;\r
-}
\ No newline at end of file
index 472dcc4..31f4dfb 100644 (file)
  * ============LICENSE_END=========================================================\r
  */\r
 syntax = "proto3";\r
-package measDataCollection;\r
+package org.onap.ves;\r
 \r
-// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).\r
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09).\r
 // Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.\r
-// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.\r
+// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version.\r
 // Differences/additions to 3GPP TS 28.550 are marked with "%%".\r
 \r
-message MeasDataCollection                  // top-level message \r
+message MeasDataCollection                  // top-level message\r
 {\r
-    MeasHeader measHeader = 1;\r
-    repeated MeasData measData = 2;         // %%: use a single instance for RTPM\r
-    MeasFooter measFooter = 3;\r
-}\r
-\r
-message MeasHeader\r
-{\r
-    string streamFormatVersion = 1;\r
+    // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod).\r
+    string formatVersion = 1;\r
     string senderName = 2;\r
     string senderType = 3;\r
     string vendorName = 4;\r
     string collectionBeginTime = 5;         // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)\r
+    uint32 granularityPeriod = 6;           // duration in seconds, %% moved from MeasInfo (single reporting period per event)\r
+    string measuredEntityUserName = 7;      // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622\r
+    string measuredEntityDn = 8;            // DN as per 3GPP TS 32.300\r
+    string measuredEntitySoftwareVersion = 9;\r
+    repeated string measObjInstIdList = 10; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432\r
+    repeated MeasInfo measInfo = 11;\r
 }\r
 \r
-message MeasData\r
-{\r
-    string measuredEntityId = 1;            // DN as per 3GPP TS 32.300\r
-    string measuredEntityUserName = 2;      // network function User Name\r
-    string measuredEntitySoftwareVersion = 3;\r
-    uint32 granularityPeriod = 4;           // in seconds, %% moved from MeasInfo (single reporting period per event)\r
-    repeated string measObjInstIdList = 5;  // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432\r
-    repeated MeasInfo measInfo = 6; \r
-}\r
-\r
-\r
 message MeasInfo\r
 {\r
     oneof MeasInfoId {                      // measurement group identifier\r
-        uint32 iMeasInfoId = 1;             // identifier as integer (%%: more compact)\r
-        string measInfoId = 2;              // identifier as string (more generic)\r
+                                            uint32 iMeasInfoId = 1;             // identifier as integer (%%: more compact)\r
+                                            string measInfoId = 2;              // identifier as string (more generic)\r
     }\r
 \r
     oneof MeasTypes {                       // measurement identifiers associated with the measurement results\r
-        IMeasTypes iMeasTypes = 3;          // identifiers as integers (%%: more compact)\r
-        SMeasTypes measTypes = 4;           // identifiers as strings (more generic)\r
+                                            IMeasTypes iMeasTypes = 3;          // identifiers as integers (%%: more compact)\r
+                                            SMeasTypes measTypes = 4;           // identifiers as strings (more generic)\r
     }\r
     // Needed only because GPB does not support repeated fields directly inside 'oneof'\r
     message IMeasTypes { repeated uint32 iMeasType = 1; }\r
     message SMeasTypes { repeated string measType = 1; }\r
 \r
-    string jobIdList = 5;\r
-    repeated MeasValue measValues = 6;      // performance measurements grouped by measurement groups\r
+    string jobId = 5;\r
+    repeated MeasValue measValues = 6;      // performance measurements grouped by measurement object\r
 }\r
 \r
 message MeasValue\r
 {\r
     oneof MeasObjInstId {                   // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432\r
-        string measObjInstId = 1;           // LDN itself\r
-        uint32 measObjInstIdListIdx = 2;    // %%: index into measObjInstIdList\r
+                                            string measObjInstId = 1;           // LDN itself\r
+                                            uint32 measObjInstIdListIdx = 2;    // %%: index into measObjInstIdList\r
     }\r
     repeated MeasResult measResults = 3;\r
     bool suspectFlag = 4;\r
-    repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data\r
+    map<string, string> measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap)\r
 }\r
 \r
 message MeasResult\r
 {\r
-    uint32 p = 1;                           // Optional index in the MeasTypes array\r
+    uint32 p = 1;                           // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes\r
     oneof xValue {\r
         sint64 iValue = 2;\r
         double rValue = 3;\r
         bool isNull = 4;\r
     }\r
 }\r
-\r
-message MeasFooter\r
-{\r
-    string timestamp = 1;                   // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"\r
-}\r
-\r
-message nameValue                           // %%: vendor-defined name-value pair\r
-{\r
-    string name = 1;\r
-    string value = 2;\r
-} 
\ No newline at end of file
index b992d53..988789d 100644 (file)
@@ -28,7 +28,7 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 import java.nio.charset.Charset
 import kotlin.test.assertTrue
 import kotlin.test.fail
@@ -42,8 +42,7 @@ object WireFrameCodecsTest : Spek({
     val encoder = WireFrameEncoder()
     val decoder = WireFrameDecoder()
 
-    fun createSampleFrame() =
-            PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
+    fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
 
     fun encodeSampleFrame() =
             createSampleFrame().let {
@@ -53,11 +52,11 @@ object WireFrameCodecsTest : Spek({
     describe("Wire Frame invariants") {
 
         given("input with unsupported major version") {
-            val input = PayloadWireFrameMessage(
+            val input = WireFrameMessage(
                     payload = ByteData.EMPTY,
                     versionMajor = 100,
                     versionMinor = 0,
-                    payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                     payloadSize = 0)
 
             it("should fail validation") {
@@ -66,11 +65,11 @@ object WireFrameCodecsTest : Spek({
         }
 
         given("input with unsupported minor version") {
-            val input = PayloadWireFrameMessage(
+            val input = WireFrameMessage(
                     payload = ByteData.EMPTY,
                     versionMajor = 1,
                     versionMinor = 6,
-                    payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                     payloadSize = 0)
 
             it("should pass validation") {
@@ -79,11 +78,11 @@ object WireFrameCodecsTest : Spek({
         }
 
         given("input with unsupported payload type") {
-            val input = PayloadWireFrameMessage(
+            val input = WireFrameMessage(
                     payload = ByteData.EMPTY,
                     versionMajor = 1,
                     versionMinor = 0,
-                    payloadTypeRaw = 0x69,
+                    payloadType = 0x69,
                     payloadSize = 0)
 
             it("should fail validation") {
@@ -92,11 +91,11 @@ object WireFrameCodecsTest : Spek({
         }
 
         given("input with too small payload size") {
-            val input = PayloadWireFrameMessage(
+            val input = WireFrameMessage(
                     payload = ByteData(byteArrayOf(1, 2, 3)),
                     versionMajor = 1,
                     versionMinor = 0,
-                    payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                     payloadSize = 1)
 
             it("should fail validation") {
@@ -105,11 +104,11 @@ object WireFrameCodecsTest : Spek({
         }
 
         given("input with too big payload size") {
-            val input = PayloadWireFrameMessage(
+            val input = WireFrameMessage(
                     payload = ByteData(byteArrayOf(1, 2, 3)),
                     versionMajor = 1,
                     versionMinor = 0,
-                    payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                     payloadSize = 8)
 
             it("should fail validation") {
@@ -119,11 +118,11 @@ object WireFrameCodecsTest : Spek({
 
         given("valid input") {
             val payload = byteArrayOf(6, 9, 8, 6)
-            val input = PayloadWireFrameMessage(
+            val input = WireFrameMessage(
                     payload = ByteData(payload),
                     versionMajor = 1,
                     versionMinor = 0,
-                    payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                    payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                     payloadSize = payload.size)
 
             it("should pass validation") {
@@ -139,14 +138,18 @@ object WireFrameCodecsTest : Spek({
         describe("encode-decode methods' compatibility") {
             val frame = createSampleFrame()
             val encoded = encodeSampleFrame()
-            val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
+            val decoded = decoder.decodeFirst(encoded).getMessageOrFail()
 
-            it("should decode version") {
+            it("should decode major version") {
                 assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor)
             }
 
+            it("should decode minor version") {
+                assertThat(decoded.versionMinor).isEqualTo(frame.versionMinor)
+            }
+
             it("should decode payload type") {
-                assertThat(decoded.payloadTypeRaw).isEqualTo(frame.payloadTypeRaw)
+                assertThat(decoded.payloadType).isEqualTo(frame.payloadType)
             }
 
             it("should decode payload size") {
@@ -170,14 +173,7 @@ object WireFrameCodecsTest : Spek({
                 assertBufferIntact(buff)
             }
 
-            it("should return end-of-transmission message when given end-of-transmission marker byte") {
-                val buff = Unpooled.buffer()
-                        .writeByte(0xAA)
-
-                assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
-            }
-
-            it("should return error when given any single byte other than end-of-transmission marker byte") {
+            it("should return error when given any single byte other than marker byte") {
                 val buff = Unpooled.buffer()
                         .writeByte(0xEE)
 
@@ -194,7 +190,7 @@ object WireFrameCodecsTest : Spek({
                 assertBufferIntact(buff)
             }
 
-            it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
+            it("should return error when length looks ok but first byte is not 0xFF") {
                 val buff = Unpooled.buffer()
                         .writeByte(0x69)
                         .writeBytes("some garbage".toByteArray())
@@ -203,14 +199,6 @@ object WireFrameCodecsTest : Spek({
                 assertBufferIntact(buff)
             }
 
-            it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
-                val buff = Unpooled.buffer()
-                        .writeByte(0xAA)
-                        .writeBytes("some garbage".toByteArray())
-
-                assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
-            }
-
             it("should return error when payload doesn't fit") {
                 val buff = Unpooled.buffer()
                         .writeBytes(encodeSampleFrame())
@@ -223,8 +211,8 @@ object WireFrameCodecsTest : Spek({
             it("should decode payload message leaving rest unread") {
                 val buff = Unpooled.buffer()
                         .writeBytes(encodeSampleFrame())
-                        .writeByte(0xAA)
-                val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+                        .writeByte(0xAB)
+                val decoded = decoder.decodeFirst(buff).getMessageOrFail()
 
                 assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
                 assertThat(buff.readableBytes()).isEqualTo(1)
@@ -236,11 +224,11 @@ object WireFrameCodecsTest : Spek({
             it("should decode successfully when payload size is equal 1 MiB") {
 
                 val payload = ByteArray(MAX_PAYLOAD_SIZE)
-                val input = PayloadWireFrameMessage(
+                val input = WireFrameMessage(
                         payload = ByteData(payload),
                         versionMajor = 1,
                         versionMinor = 0,
-                        payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                        payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                         payloadSize = payload.size)
 
 
@@ -250,11 +238,11 @@ object WireFrameCodecsTest : Spek({
             it("should return error when payload exceeds 1 MiB") {
 
                 val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
-                val input = PayloadWireFrameMessage(
+                val input = WireFrameMessage(
                         payload = ByteData(payload),
                         versionMajor = 1,
                         versionMinor = 0,
-                        payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                        payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                         payloadSize = payload.size)
                 val buff = encoder.encode(input)
 
@@ -266,11 +254,11 @@ object WireFrameCodecsTest : Spek({
             it("should validate only first message") {
 
                 val payload = ByteArray(MAX_PAYLOAD_SIZE)
-                val input = PayloadWireFrameMessage(
+                val input = WireFrameMessage(
                         payload = ByteData(payload),
                         versionMajor = 1,
                         versionMinor = 0,
-                        payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                        payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
                         payloadSize = payload.size)
 
 
@@ -289,21 +277,6 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
     fold({ assertj(assertThat(it)) }, { fail("Error expected") })
 }
 
-private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
-        fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
-
-private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
-        this as? PayloadWireFrameMessage
-                ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
-
-
-private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
-    decoded.getEndOfTransmissionMessageOrFail()
-}
-
-private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
-        fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getMessageOrFail(): WireFrameMessage =
+        fold({ fail(it.message) }, { it })
 
-private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
-        this as? EndOfTransmissionMessage
-                ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
index 7804226..85bdcab 100644 (file)
@@ -23,8 +23,8 @@ import com.google.protobuf.ByteString
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
@@ -52,9 +52,6 @@ fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
             writeBytes(gpb)  // ves event as GPB bytes
         }
 
-fun endOfTransmissionWireMessage(): ByteBuf =
-        allocator.buffer().writeByte(0xAA)
-
 fun wireFrameMessageWithInvalidPayload(): ByteBuf = allocator.buffer().run {
     writeValidWireFrameHeaders()
 
@@ -69,8 +66,8 @@ fun garbageFrame(): ByteBuf = allocator.buffer().run {
 
 fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
     writeByte(0xFF)
-    writeByte(0x01)   // version
-    writeByte(0x01)   // content type = GPB
+    writeByte(0x01)   // version major
+    writeByte(0x01)   // version minor
 }
 
 fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf =
index 0341c2f..57b960a 100644 (file)
@@ -39,7 +39,7 @@ fun vesEvent(commonEventHeader: CommonEventHeader,
              hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
         VesEventOuterClass.VesEvent.newBuilder()
                 .setCommonEventHeader(commonEventHeader)
-                .setHvMeasFields(hvRanMeasFields)
+                .setEventFields(hvRanMeasFields)
                 .build()
 
 fun commonHeader(domain: VesEventDomain = HVMEAS,
index d9329cb..ace7f1c 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.api
 
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
 import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
 import reactor.core.publisher.Flux
@@ -29,7 +29,7 @@ import reactor.core.publisher.Flux
  * @since June 2018
  */
 interface MessageGenerator {
-    fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage>
+    fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
 
     companion object {
         val INSTANCE: MessageGenerator by lazy {
index 5d1f56d..90e7770 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 import com.google.protobuf.ByteString
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.PayloadContentType
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
@@ -44,11 +44,11 @@ import java.nio.charset.Charset
  */
 class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
 
-    override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> = Flux
+    override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
             .fromIterable(messageParameters)
             .flatMap { createMessageFlux(it) }
 
-    private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> =
+    private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> =
             Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) }
                     .let {
                         if (parameters.amount < 0)
@@ -57,17 +57,17 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
                             it.repeat(parameters.amount)
                     }
 
-    private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): PayloadWireFrameMessage =
+    private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage =
             when (messageType) {
                 VALID ->
-                    PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
+                    WireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
                 TOO_BIG_PAYLOAD ->
-                    PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
+                    WireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
                 FIXED_PAYLOAD ->
-                    PayloadWireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
+                    WireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
                 INVALID_WIRE_FRAME -> {
                     val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
-                    PayloadWireFrameMessage(
+                    WireFrameMessage(
                             payload,
                             UNSUPPORTED_VERSION,
                             UNSUPPORTED_VERSION,
@@ -75,7 +75,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
                             payload.size())
                 }
                 INVALID_GPB_DATA ->
-                    PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+                    WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
             }
 
     private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray {
@@ -85,11 +85,11 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
     private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
             VesEvent.newBuilder()
                     .setCommonEventHeader(commonEventHeader)
-                    .setHvMeasFields(payload)
+                    .setEventFields(payload)
                     .build()
 
     private fun oversizedPayload() =
-            payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+            payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1)
 
     private fun fixedPayload() =
             payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
index ea3d094..e380f93 100644 (file)
@@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
@@ -87,7 +87,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
                             }
                             .verifyComplete()
@@ -105,7 +105,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
                             }
                             .verifyComplete()
@@ -122,7 +122,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
                                         .isThrownBy { extractCommonEventHeader(it.payload) }
                             }
@@ -140,9 +140,9 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isFalse()
-                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
-                                assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR)
+                                assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
                             }
                             .verifyComplete()
                 }
@@ -158,7 +158,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                                 assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
                             }
@@ -177,17 +177,17 @@ object MessageGeneratorImplTest : Spek({
                 generator.createMessageFlux(messageParameters)
                         .test()
                         .assertNext {
-                            assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
                         }
                         .expectNextCount(singleFluxSize - 1)
                         .assertNext {
-                            assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
                         }
                         .expectNextCount(singleFluxSize - 1)
                         .assertNext {
-                            assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name)
                         }
                         .expectNextCount(singleFluxSize - 1)
@@ -202,5 +202,5 @@ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
 
 
 fun extractHvRanMeasFields(bytes: ByteData): ByteString =
-        VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields
+        VesEvent.parseFrom(bytes.unsafeAsArray()).eventFields
 
index d1a5296..af71e9c 100644 (file)
@@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth
 import io.netty.handler.ssl.SslContext
 import io.netty.handler.ssl.SslContextBuilder
 import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
@@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
             }
             .build()
 
-    fun sendIo(messages: Flux<PayloadWireFrameMessage>) =
+    fun sendIo(messages: Flux<WireFrameMessage>) =
             sendRx(messages).then(Mono.just(Unit)).asIo()
 
-    private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
+    private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
         val complete = ReplayProcessor.create<Void>(1)
         client
                 .newHandler { _, output -> handler(complete, messages, output) }
@@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
     }
 
     private fun handler(complete: ReplayProcessor<Void>,
-                        messages: Flux<PayloadWireFrameMessage>,
+                        messages: Flux<WireFrameMessage>,
                         nettyOutbound: NettyOutbound): Publisher<Void> {
 
         val allocator = nettyOutbound.alloc()
@@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
                 .logConnectionClosed()
                 .options { it.flushOnBoundary() }
                 .sendGroups(frames)
-                .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
                 .then {
                     logger.info("Messages have been sent")
                     complete.onComplete()
@@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
     companion object {
         private val logger = Logger(VesHvClient::class)
         private const val MAX_BATCH_SIZE = 128
-        private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
     }
 }
index 80f3957..9753588 100644 (file)
@@ -30,7 +30,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
 import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
@@ -98,7 +98,7 @@ internal class XnfSimulatorTest : Spek({
             // given
             val json = "[true]".byteInputStream()
             val messageParams = listOf<MessageParameters>()
-            val generatedMessages = Flux.empty<PayloadWireFrameMessage>()
+            val generatedMessages = Flux.empty<WireFrameMessage>()
             val sendingIo = IO {}
             whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
             whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)