Support scenarios for continuous streaming test 43/58843/1
authorJakub Dudycz <jakub.dudycz@nokia.com>
Wed, 18 Jul 2018 12:33:10 +0000 (14:33 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 3 Aug 2018 05:59:03 +0000 (07:59 +0200)
Added support for below scenarios
-too big payloads
-invalid wire frames
-invalid GPB data
-unsupported domains

Changed input json for xnf simulator endpoint

Closes ONAP-500

Change-Id: I19e84a76cef501e274ea8152f3c33c95dddcaac9
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt [new file with mode: 0644]
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt [moved from hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/config/MessageParameters.kt with 80% similarity]
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt

index b793f3a..4953d8f 100644 (file)
@@ -29,7 +29,9 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
 import reactor.core.publisher.Flux
 import reactor.math.sum
 import java.security.MessageDigest
@@ -55,8 +57,10 @@ object PerformanceSpecification : Spek({
             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
 
             val params = MessageParameters(
-                    commonEventHeader = vesEvent().commonEventHeader,
+                    domain = HVRANMEAS,
+                    messageType = VALID,
                     amount = numMessages)
+
             val fluxes = (1.rangeTo(runs)).map {
                 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
             }
@@ -82,7 +86,8 @@ object PerformanceSpecification : Spek({
             val timeout = Duration.ofSeconds(30)
 
             val params = MessageParameters(
-                    commonEventHeader = vesEvent().commonEventHeader,
+                    domain = HVRANMEAS,
+                    messageType = VALID,
                     amount = numMessages)
 
             val dataStream = generateDataStream(sut.alloc, params)
@@ -162,7 +167,7 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt
 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
         WireFrameEncoder(alloc).let { encoder ->
             MessageGenerator.INSTANCE
-                    .createMessageFlux(params)
+                    .createMessageFlux(listOf(params))
                     .map(encoder::encode)
                     .transform { simulateRemoteTcp(alloc, 1000, it) }
         }
index e52db84..7407f69 100644 (file)
 package org.onap.dcae.collectors.veshv.ves.message.generator.api
 
 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
 import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
-import org.onap.ves.VesEventV5
 import reactor.core.publisher.Flux
-import javax.json.JsonObject
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
 interface MessageGenerator {
-    fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage>
-    fun parseCommonHeader(json: JsonObject): VesEventV5.VesEvent.CommonEventHeader
+    fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage>
 
     companion object {
         val INSTANCE: MessageGenerator by lazy {
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
new file mode 100644 (file)
index 0000000..cc00f5a
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
+
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+data class MessageParameters(val domain: Domain,
+                             val messageType: MessageType,
+                             val amount: Long = -1)
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.ves.message.generator.config
-
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+package org.onap.dcae.collectors.veshv.ves.message.generator.api
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
+ * @since July 2018
  */
-data class MessageParameters(val commonEventHeader: CommonEventHeader, val amount: Long = -1)
+enum class MessageType {
+    VALID,
+    TOO_BIG_PAYLOAD,
+    UNSUPPORTED_DOMAIN,
+    INVALID_WIRE_FRAME,
+    INVALID_GPB_DATA
+}
index b2f7389..dca573d 100644 (file)
 package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 
 import com.google.protobuf.ByteString
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadContentType
 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.*
+import org.onap.ves.HVRanMeasFieldsV5
+import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
+import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
 import org.onap.ves.VesEventV5.VesEvent
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.OTHER
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import javax.json.JsonObject
+import java.nio.charset.Charset
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -35,41 +44,78 @@ import javax.json.JsonObject
  */
 class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
 
-    override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> =
-            Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
-                if (messageParameters.amount < 0)
-                    it.repeat()
-                else
-                    it.repeat(messageParameters.amount)
-            }
+    override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> = Flux
+            .fromIterable(messageParameters)
+            .flatMap { createMessageFlux(it) }
 
-    override fun parseCommonHeader(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder()
-            .setVersion(json.getString("version"))
-            .setDomain(CommonEventHeader.Domain.forNumber(json.getInt("domain")))
-            .setSequence(json.getInt("sequence"))
-            .setPriority(CommonEventHeader.Priority.forNumber(json.getInt("priority")))
-            .setEventId(json.getString("eventId"))
-            .setEventName(json.getString("eventName"))
-            .setEventType(json.getString("eventType"))
-            .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
-            .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
-            .setNfNamingCode(json.getString("nfNamingCode"))
-            .setNfcNamingCode(json.getString("nfcNamingCode"))
-            .setReportingEntityId(json.getString("reportingEntityId"))
-            .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
-            .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
-            .setSourceName(json.getString("sourceName"))
-            .build()
+    private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> =
+            Mono.fromCallable { createMessage(parameters.domain, parameters.messageType) }
+                    .let {
+                        if (parameters.amount < 0)
+                            it.repeat()
+                        else
+                            it.repeat(parameters.amount)
+                    }
 
+    private fun createMessage(domain: Domain, messageType: MessageType): PayloadWireFrameMessage =
+            when (messageType) {
+                VALID ->
+                    PayloadWireFrameMessage(vesEvent(domain, payloadGenerator.generatePayload()))
+                TOO_BIG_PAYLOAD ->
+                    PayloadWireFrameMessage(vesEvent(domain, oversizedPayload()))
+                UNSUPPORTED_DOMAIN ->
+                    PayloadWireFrameMessage(vesEvent(OTHER, payloadGenerator.generatePayload()))
+                INVALID_WIRE_FRAME -> {
+                    val payload = ByteData(vesEvent(domain, payloadGenerator.generatePayload()))
+                    PayloadWireFrameMessage(
+                            payload,
+                            UNSUPPORTED_VERSION,
+                            PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                            payload.size())
+                }
+                INVALID_GPB_DATA ->
+                    PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+            }
 
-    private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage =
-            PayloadWireFrameMessage(vesMessageBytes(commonHeader))
+    private fun vesEvent(domain: Domain, hvRanMeasPayload: HVRanMeasPayload): ByteArray {
+        return vesEvent(domain, hvRanMeasPayload.toByteString())
+    }
 
+    private fun vesEvent(domain: Domain, hvRanMeasPayload: ByteString): ByteArray {
+        return createVesEvent(createCommonHeader(domain), hvRanMeasPayload).toByteArray()
+    }
 
-    private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
+    private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
             VesEvent.newBuilder()
-                    .setCommonEventHeader(commonHeader)
-                    .setHvRanMeasFields(payloadGenerator.generatePayload().toByteString())
+                    .setCommonEventHeader(commonEventHeader)
+                    .setHvRanMeasFields(payload)
                     .build()
-                    .toByteArray()
+
+    private fun oversizedPayload() =
+            payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+
+
+    private fun createCommonHeader(domain: Domain): CommonEventHeader = CommonEventHeader.newBuilder()
+            .setVersion("sample-version")
+            .setDomain(domain)
+            .setSequence(1)
+            .setPriority(CommonEventHeader.Priority.NORMAL)
+            .setEventId("sample-event-id")
+            .setEventName("sample-event-name")
+            .setEventType("sample-event-type")
+            .setStartEpochMicrosec(SAMPLE_START_EPOCH)
+            .setLastEpochMicrosec(SAMPLE_LAST_EPOCH)
+            .setNfNamingCode("sample-nf-naming-code")
+            .setNfcNamingCode("sample-nfc-naming-code")
+            .setReportingEntityId("sample-reporting-entity-id")
+            .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+            .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
+            .setSourceName("sample-source-name")
+            .build()
+
+    companion object {
+        private const val UNSUPPORTED_VERSION: Short = 2
+        private const val SAMPLE_START_EPOCH = 120034455L
+        private const val SAMPLE_LAST_EPOCH = 120034455L
+    }
 }
index 66f34e9..c85ce03 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 
+import com.google.protobuf.ByteString
 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject
 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas
@@ -28,6 +29,9 @@ internal class PayloadGenerator {
 
     private val randomGenerator = Random()
 
+    fun generateRawPayload(size: Int): ByteString =
+            ByteString.copyFrom(ByteArray(size))
+
     fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload {
         val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject)
         return HVRanMeasPayload.newBuilder()
index 0702717..fb14461 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
 
-import com.google.protobuf.ByteString
+import com.google.protobuf.InvalidProtocolBufferException
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
-import org.onap.ves.VesEventV5
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.*
 import reactor.test.test
 
-const val SAMPLE_START_EPOCH: Long = 120034455
-const val SAMPLE_LAST_EPOCH: Long = 120034455
-
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
 object MessageGeneratorImplTest : Spek({
     describe("message factory") {
-
         val generator = MessageGenerator.INSTANCE
+        given("single message parameters") {
+            on("messages amount not specified in parameters") {
+                it("should create infinite flux") {
+                    val limit = 1000L
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID)))
+                            .take(limit)
+                            .test()
+                            .expectNextCount(limit)
+                            .verifyComplete()
+                }
+            }
+            on("messages amount specified in parameters") {
+                it("should create message flux of specified size") {
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5)))
+                            .test()
+                            .expectNextCount(5)
+                            .verifyComplete()
+                }
+            }
+            on("message type requesting valid message") {
+                it("should create flux of valid messages with given domain") {
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1)))
+                            .test()
+                            .assertNext {
+                                assertThat(it.isValid()).isTrue()
+                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+                            }
+                            .verifyComplete()
+                }
+            }
+            on("message type requesting too big payload") {
+                it("should create flux of messages with given domain and payload exceeding threshold") {
 
-        given("only common header") {
-            it("should return infinite flux") {
-                val limit = 1000L
-                generator.createMessageFlux(getSampleMessageParameters()).take(limit).test()
-                        .expectNextCount(limit)
-                        .verifyComplete()
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1)))
+                            .test()
+                            .assertNext {
+                                assertThat(it.isValid()).isTrue()
+                                assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+                            }
+                            .verifyComplete()
+                }
+            }
+            on("message type requesting unsupported domain") {
+                it("should create flux of messages with domain other than HVRANMEAS") {
+
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1)))
+                            .test()
+                            .assertNext {
+                                assertThat(it.isValid()).isTrue()
+                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS)
+                            }
+                            .verifyComplete()
+                }
+            }
+            on("message type requesting invalid GPB data ") {
+                it("should create flux of messages with invalid payload") {
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1)))
+                            .test()
+                            .assertNext {
+                                assertThat(it.isValid()).isTrue()
+                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+                                        .isThrownBy { extractCommonEventHeader(it.payload) }
+                            }
+                            .verifyComplete()
+                }
+            }
+            on("message type requesting invalid wire frame ") {
+                it("should create flux of messages with invalid version") {
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1)))
+                            .test()
+                            .assertNext {
+                                assertThat(it.isValid()).isFalse()
+                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+                                assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION)
+                            }
+                            .verifyComplete()
+                }
             }
         }
-        given("common header and messages amount") {
-            it("should return message flux of specified size") {
-                generator.createMessageFlux((getSampleMessageParameters(5))).test()
-                        .expectNextCount(5)
+        given("list of message parameters") {
+            it("should create concatenated flux of messages") {
+                val singleFluxSize = 5L
+                val messageParameters = listOf(
+                        MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize),
+                        MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
+                        MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize)
+                )
+                generator.createMessageFlux(messageParameters)
+                        .test()
+                        .assertNext {
+                            assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+                        }
+                        .expectNextCount(singleFluxSize - 1)
+                        .assertNext {
+                            assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+                        }
+                        .expectNextCount(singleFluxSize - 1)
+                        .assertNext {
+                            assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
+                        }
+                        .expectNextCount(singleFluxSize - 1)
                         .verifyComplete()
             }
         }
     }
 })
 
-fun getSampleMessageParameters(amount: Long = -1): MessageParameters {
-    val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
-            .setVersion("sample-version")
-            .setDomain(HVRANMEAS)
-            .setSequence(1)
-            .setPriority(MEDIUM)
-            .setEventId("sample-event-id")
-            .setEventName("sample-event-name")
-            .setEventType("sample-event-type")
-            .setStartEpochMicrosec(SAMPLE_START_EPOCH)
-            .setLastEpochMicrosec(SAMPLE_LAST_EPOCH)
-            .setNfNamingCode("sample-nf-naming-code")
-            .setNfcNamingCode("sample-nfc-naming-code")
-            .setReportingEntityId("sample-reporting-entity-id")
-            .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
-            .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
-            .setSourceName("sample-source-name")
-            .build()
-
-    return MessageParameters(commonHeader, amount)
-}
+fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
+    return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+}
\ No newline at end of file
index 08a35d4..0ab248b 100644 (file)
@@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import ratpack.exec.Promise
 import ratpack.handling.Chain
 import ratpack.handling.Context
@@ -31,8 +33,9 @@ import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
 import reactor.core.publisher.Flux
 import reactor.core.scheduler.Schedulers
+import java.nio.charset.Charset
 import javax.json.Json
-import javax.json.JsonObject
+import javax.json.JsonArray
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -47,7 +50,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
         }
     }
 
-
     private fun configureHandlers(chain: Chain) {
         chain
                 .post("simulator/sync") { ctx ->
@@ -68,11 +70,26 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
 
     private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
         return ctx.request.body
-                .map { Json.createReader(it.inputStream).readObject() }
+                .map { Json.createReader(it.inputStream).readArray() }
                 .map { extractMessageParameters(it) }
                 .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
     }
 
+    private fun extractMessageParameters(request: JsonArray): List<MessageParameters> =
+            try {
+                request
+                        .map { it.asJsonObject() }
+                        .map {
+
+                            val domain = Domain.valueOf(it.getString("domain"))
+                            val messageType = MessageType.valueOf(it.getString("messageType"))
+                            val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
+                            MessageParameters(domain, messageType, messagesAmount)
+                        }
+            } catch (e: Exception) {
+                throw ValidationException("Validating request body failed", e)
+            }
+
     private fun sendAcceptedResponse(ctx: Context) {
         ctx.response
                 .status(STATUS_OK)
@@ -94,17 +111,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
                         .toString())
     }
 
-    private fun extractMessageParameters(request: JsonObject): MessageParameters =
-            try {
-                val commonEventHeader = MessageGenerator.INSTANCE
-                        .parseCommonHeader(request.getJsonObject("commonEventHeader"))
-                val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
-                MessageParameters(commonEventHeader, messagesAmount)
-            } catch (e: Exception) {
-                throw ValidationException("Validating request body failed", e)
-            }
-
-
     companion object {
         private val logger = Logger(HttpServer::class)
         const val DEFAULT_PORT = 5000