DCAE APP simulator rework 69/58869/1
authorJakub Dudycz <jakub.dudycz@nokia.com>
Thu, 26 Jul 2018 09:49:45 +0000 (11:49 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 3 Aug 2018 08:15:25 +0000 (10:15 +0200)
- Extract message parameters parsing logic to standalone class in utils
- Make DCAE APP simulator store whole received messages history
- Add validation endpoint
- Add new messege type: FIXED_PAYLOAD

Closes ONAP-686

Change-Id: I865804716ad5e46a7503a8eee70cfe9ac75a6c1e
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-601

17 files changed:
hv-collector-dcae-app-simulator/pom.xml
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
hv-collector-utils/pom.xml
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParser.kt
hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt [new file with mode: 0644]
hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt [new file with mode: 0644]
hv-collector-ves-message-generator/pom.xml
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt [deleted file]
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageType.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt [deleted file]
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt

index a2f92e8..f3c1735 100644 (file)
             <artifactId>hv-collector-utils</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-ves-message-generator</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-effects</artifactId>
index 264033e..065cdf9 100644 (file)
@@ -28,7 +28,9 @@ import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
 
 class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
     override val cmdLineOptionsList: List<CommandLineOption> = listOf(
index 869c5ab..08bb149 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
 
-import arrow.core.Option
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.kafka.receiver.ReceiverRecord
+import java.util.concurrent.ConcurrentLinkedQueue
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+    val messagesCount: Int by lazy {
+        messages.size
+    }
 
-class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
+    val consumedMessages: List<ByteArray> by lazy {
+        messages.toList()
+    }
+}
 
 interface ConsumerStateProvider {
     fun currentState(): ConsumerState
@@ -37,31 +44,21 @@ interface ConsumerStateProvider {
 }
 
 class Consumer : ConsumerStateProvider {
-    private var msgCount = 0L
-    private var lastKey: ByteArray? = null
-    private var lastValue: ByteArray? = null
 
-    override fun currentState() =
-            ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
+    private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
 
-    override fun reset() = IO {
-        synchronized(this) {
-            msgCount = 0
-            lastKey = null
-            lastValue = null
-        }
+    override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
+
+    override fun reset(): IO<Unit> = IO {
+        consumedMessages.clear()
     }
 
     fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
-
-        synchronized(this) {
-            msgCount++
-            lastKey = record.key()
-            lastValue = record.value()
-        }
+        consumedMessages.add(record.value())
     }
 
+
     companion object {
         private val logger = Logger(Consumer::class)
     }
index d1d90b0..cb1484b 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
 
-import arrow.core.Try
-import arrow.core.getOrElse
 import arrow.effects.IO
-import com.google.protobuf.MessageOrBuilder
-import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
+import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
 import org.onap.ves.VesEventV5.VesEvent
 import ratpack.handling.Chain
+import ratpack.handling.Context
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import javax.json.Json
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class ApiServer(private val consumerFactory: ConsumerFactory) {
+class ApiServer(private val consumerFactory: ConsumerFactory,
+                private val messageParametersParser: MessageParametersParser = MessageParametersParser()) {
 
     private lateinit var consumerState: ConsumerStateProvider
-    private val jsonPrinter = JsonFormat.printer()
 
     fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
         consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
@@ -57,60 +60,92 @@ class ApiServer(private val consumerFactory: ConsumerFactory) {
                         val topics = extractTopics(it.text)
                         logger.info("Received new configuration. Creating consumer for topics: $topics")
                         consumerState = consumerFactory.createConsumerForTopics(topics)
-                        ctx.response.contentType(CONTENT_TEXT)
-                        ctx.response.send("OK")
+                        ctx.response
+                                .status(STATUS_OK)
+                                .send()
                     }
 
                 }
-
-                .get("messages/count") { ctx ->
-                    ctx.response.contentType(CONTENT_TEXT)
-                    val state = consumerState.currentState()
-                    ctx.response.send(state.msgCount.toString())
-                }
-
-                .get("messages/last/key") { ctx ->
-                    ctx.response.contentType(CONTENT_JSON)
-                    val state = consumerState.currentState()
-                    val resp = state.lastKey
-                            .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
-                            .map(this::protobufToJson)
-                            .getOrElse { "null" }
-                    ctx.response.send(resp)
-                }
-
-                .get("messages/last/value") { ctx ->
-                    ctx.response.contentType(CONTENT_JSON)
-                    val state = consumerState.currentState()
-                    val resp = state.lastValue
-                            .map { Try { VesEvent.parseFrom(it) } }
-                            .map(this::protobufToJson)
-                            .getOrElse { "null" }
-                    ctx.response.send(resp)
-                }
-
-                .get("messages/last/hvRanMeasFields") { ctx ->
-                    ctx.response.contentType(CONTENT_JSON)
-                    val state = consumerState.currentState()
-                    val resp = state.lastValue
-                            .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
-                            .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
-                            .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
-                            .map(this::protobufToJson)
-                            .getOrElse { "null" }
-                    ctx.response.send(resp)
-                }
-
                 .delete("messages") { ctx ->
                     ctx.response.contentType(CONTENT_TEXT)
                     consumerState.reset()
                             .unsafeRunAsync {
                                 it.fold(
-                                        { ctx.response.send("NOK") },
-                                        { ctx.response.send("OK") }
-                                )
+                                        { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
+                                        { ctx.response.status(STATUS_OK) }
+                                ).send()
                             }
                 }
+                .get("messages/all/count") { ctx ->
+                    val state = consumerState.currentState()
+                    ctx.response
+                            .contentType(CONTENT_TEXT)
+                            .send(state.messagesCount.toString())
+                }
+                .post("messages/all/validate") { ctx ->
+                    ctx.request.body
+                            .map { Json.createReader(it.inputStream).readArray() }
+                            .map { messageParametersParser.parse(it) }
+                            .map { generateEvents(ctx, it) }
+                            .then { (generatedEvents, shouldValidatePayloads) ->
+                                generatedEvents
+                                        .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
+                                        .block()
+                            }
+                }
+    }
+
+    private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
+            Pair<Mono<List<VesEvent>>, Boolean> = Pair(
+
+            doGenerateEvents(parameters).doOnError {
+                logger.error("Error occurred when generating messages: $it")
+                ctx.response
+                        .status(STATUS_INTERNAL_SERVER_ERROR)
+                        .send()
+            },
+            parameters.all { it.messageType == FIXED_PAYLOAD }
+    )
+
+    private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
+            .createMessageFlux(parameters)
+            .map(PayloadWireFrameMessage::payload)
+            .map { decode(it.unsafeAsArray()) }
+            .collectList()
+
+
+    private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
+
+
+    private fun sendResponse(ctx: Context,
+                             generatedEvents: List<VesEvent>,
+                             shouldValidatePayloads: Boolean) =
+            resolveResponseStatusCode(
+                    generated = generatedEvents,
+                    consumed = decodeConsumedEvents(),
+                    validatePayloads = shouldValidatePayloads
+            ).let { ctx.response.status(it).send() }
+
+
+    private fun decodeConsumedEvents(): List<VesEvent> = consumerState
+            .currentState()
+            .consumedMessages
+            .map(::decode)
+
+
+    private fun resolveResponseStatusCode(generated: List<VesEvent>,
+                                          consumed: List<VesEvent>,
+                                          validatePayloads: Boolean): Int =
+            if (validatePayloads) {
+                if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
+            } else {
+                validateHeaders(consumed, generated)
+            }
+
+    private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
+        val consumedHeaders = consumed.map { it.commonEventHeader }
+        val generatedHeaders = generated.map { it.commonEventHeader }
+        return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
     }
 
     private fun extractTopics(it: String): Set<String> =
@@ -118,16 +153,14 @@ class ApiServer(private val consumerFactory: ConsumerFactory) {
                     .split(",")
                     .toSet()
 
-    private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
-            parseResult.fold(
-                    { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
-                    { jsonPrinter.print(it) })
-
-
     companion object {
         private val logger = Logger(ApiServer::class)
-
         private const val CONTENT_TEXT = "text/plain"
-        private const val CONTENT_JSON = "application/json"
+
+        private const val STATUS_OK = 200
+        private const val STATUS_BAD_REQUEST = 400
+        private const val STATUS_INTERNAL_SERVER_ERROR = 500
     }
 }
+
+
index d0e4493..39097c1 100644 (file)
     </build>
 
     <dependencies>
-        <dependency>
-            <groupId>${project.parent.groupId}</groupId>
-            <artifactId>hv-collector-domain</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>hv-collector-ves-message-generator</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.glassfish</groupId>
-            <artifactId>javax.json</artifactId>
-        </dependency>
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.nhaarman</groupId>
             <artifactId>mockito-kotlin</artifactId>
index 9c873a0..c00ce68 100644 (file)
@@ -54,20 +54,16 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) {
 
     protected abstract fun getConfiguration(cmdLine: CommandLine): Option<T>
 
-    protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int =
-            intValue(cmdLineOpt).getOrElse { default }
-
     protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
             longValue(cmdLineOpt).getOrElse { default }
 
     protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String =
             optionValue(cmdLineOpt).getOrElse { default }
 
-
     protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> =
             optionValue(cmdLineOpt).map(String::toInt)
 
-    protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
+    private fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
             optionValue(cmdLineOpt).map(String::toLong)
 
     protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> =
index 24c2cbf..1621ba5 100644 (file)
@@ -35,14 +35,17 @@ class MessageParametersParser(
                 request
                         .map { it.asJsonObject() }
                         .map {
-                            val commonEventHeader = commonEventHeaderParser.parse(it.getJsonObject("commonEventHeader"))
+                            val commonEventHeader = commonEventHeaderParser
+                                    .parse(it.getJsonObject("commonEventHeader"))
                             val messageType = MessageType.valueOf(it.getString("messageType"))
-                            val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
+                            val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue()
+                                    ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.",
+                                            NullPointerException())
                             MessageParameters(commonEventHeader, messageType, messagesAmount)
                         }
             } catch (e: Exception) {
                 throw ParsingException("Parsing request body failed", e)
             }
 
-    internal class ParsingException(message: String?, cause: Exception) : Exception(message, cause)
+    internal class ParsingException(message: String, cause: Exception) : Exception(message, cause)
 }
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/MessageParametersParserTest.kt
new file mode 100644 (file)
index 0000000..ec628a2
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.messages
+
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser.ParsingException
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+
+private const val EXPECTED_MESSAGES_AMOUNT = 25000L
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+object MessageParametersParserTest : Spek({
+    describe("Messages parameters parser") {
+        val messageParametersParser = MessageParametersParser()
+
+        given("parameters json array") {
+            on("valid parameters json") {
+                it("should parse MessagesParameters object successfully") {
+                    val result = messageParametersParser.parse(validMessagesParametesJson())
+
+                    assertThat(result).isNotNull
+                    assertThat(result).hasSize(2)
+                    val firstMessage = result.first()
+                    assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
+                    assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+                }
+            }
+            on("invalid parameters json") {
+                it("should throw exception") {
+                    assertThatExceptionOfType(ParsingException::class.java).isThrownBy {
+                        messageParametersParser.parse(invalidMessagesParametesJson())
+                    }
+                }
+            }
+        }
+    }
+})
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/messages/parameters.kt
new file mode 100644 (file)
index 0000000..f6a3a15
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.messages
+
+import javax.json.Json
+
+private const val validMessageParameters = "[\n" +
+        "  {\n" +
+        "    \"commonEventHeader\": {\n" +
+        "      \"version\": \"sample-version\",\n" +
+        "      \"domain\": \"HVRANMEAS\",\n" +
+        "      \"sequence\": 1,\n" +
+        "      \"priority\": 1,\n" +
+        "      \"eventId\": \"sample-event-id\",\n" +
+        "      \"eventName\": \"sample-event-name\",\n" +
+        "      \"eventType\": \"sample-event-type\",\n" +
+        "      \"startEpochMicrosec\": 120034455,\n" +
+        "      \"lastEpochMicrosec\": 120034455,\n" +
+        "      \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
+        "      \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
+        "      \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
+        "      \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
+        "      \"sourceId\": \"sample-source-id\",\n" +
+        "      \"sourceName\": \"sample-source-name\"\n" +
+        "    },\n" +
+        "    \"messageType\": \"VALID\",\n" +
+        "    \"messagesAmount\": 25000\n" +
+        "  },\n" +
+        "  {\n" +
+        "    \"commonEventHeader\": {\n" +
+        "      \"version\": \"sample-version\",\n" +
+        "      \"domain\": \"HVRANMEAS\",\n" +
+        "      \"sequence\": 1,\n" +
+        "      \"priority\": 1,\n" +
+        "      \"eventId\": \"sample-event-id\",\n" +
+        "      \"eventName\": \"sample-event-name\",\n" +
+        "      \"eventType\": \"sample-event-type\",\n" +
+        "      \"startEpochMicrosec\": 120034455,\n" +
+        "      \"lastEpochMicrosec\": 120034455,\n" +
+        "      \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
+        "      \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
+        "      \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
+        "      \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
+        "      \"sourceId\": \"sample-source-id\",\n" +
+        "      \"sourceName\": \"sample-source-name\"\n" +
+        "    },\n" +
+        "    \"messageType\": \"TOO_BIG_PAYLOAD\",\n" +
+        "    \"messagesAmount\": 100\n" +
+        "  }\n" +
+        "]"
+
+private const val invalidMessageParameters = "[\n" +
+        "  {\n" +
+        "    \"commonEventHeader\": {\n" +
+        "      \"version\": \"sample-version\",\n" +
+        "      \"domain\": \"HVRANMEAS\",\n" +
+        "      \"sequence\": 1,\n" +
+        "      \"priority\": 1,\n" +
+        "      \"eventId\": \"sample-event-id\",\n" +
+        "      \"eventName\": \"sample-event-name\",\n" +
+        "      \"eventType\": \"sample-event-type\",\n" +
+        "      \"startEpochMicrosec\": 120034455,\n" +
+        "      \"lastEpochMicrosec\": 120034455,\n" +
+        "      \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
+        "      \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
+        "      \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
+        "      \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
+        "      \"sourceId\": \"sample-source-id\",\n" +
+        "      \"sourceName\": \"sample-source-name\"\n" +
+        "    },\n" +
+        "    \"messagesAmount\": 3\n" +
+        "  }\n" +
+        "]"
+
+fun validMessagesParametesJson() = Json
+        .createReader(validMessageParameters.reader())
+        .readArray()
+
+fun invalidMessagesParametesJson() = Json
+        .createReader(invalidMessageParameters.reader())
+        .readArray()
index f049d78..a7dad24 100644 (file)
             <artifactId>logback-classic</artifactId>
             <scope>runtime</scope>
         </dependency>
-        <dependency>
-            <groupId>org.glassfish</groupId>
-            <artifactId>javax.json</artifactId>
-        </dependency>
     </dependencies>
 
 
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/CommonEventHeaderParser.kt
deleted file mode 100644 (file)
index 605b172..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.ves.message.generator.api
-
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParserImpl
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import javax.json.JsonObject
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-interface CommonEventHeaderParser {
-
-    fun parse(json: JsonObject): CommonEventHeader
-
-    companion object {
-        val INSTANCE: CommonEventHeaderParser by lazy {
-            CommonEventHeaderParserImpl()
-        }
-    }
-}
\ No newline at end of file
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserImpl.kt
deleted file mode 100644 (file)
index 61f5f2f..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl
-
-import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.CommonEventHeaderParser
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
-import javax.json.JsonObject
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-class CommonEventHeaderParserImpl : CommonEventHeaderParser {
-
-    override fun parse(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder()
-            .setVersion(json.getString("version"))
-            .setDomain(Domain.valueOf(json.getString("domain")))
-            .setSequence(json.getInt("sequence"))
-            .setPriority(Priority.forNumber(json.getInt("priority")))
-            .setEventId(json.getString("version"))
-            .setEventName(json.getString("version"))
-            .setEventType(json.getString("version"))
-            .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
-            .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
-            .setNfNamingCode(json.getString("nfNamingCode"))
-            .setNfcNamingCode(json.getString("nfcNamingCode"))
-            .setReportingEntityId(json.getString("reportingEntityId"))
-            .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
-            .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
-            .setSourceName(json.getString("sourceName"))
-            .build()
-}
\ No newline at end of file
index e9db716..c6e0556 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_GPB_DATA
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
@@ -63,6 +64,8 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
                     PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
                 TOO_BIG_PAYLOAD ->
                     PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
+                FIXED_PAYLOAD ->
+                    PayloadWireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
                 INVALID_WIRE_FRAME -> {
                     val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
                     PayloadWireFrameMessage(
@@ -92,6 +95,9 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
     private fun oversizedPayload() =
             payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1)
 
+    private fun fixedPayload() =
+            payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
+
     companion object {
         private const val UNSUPPORTED_VERSION: Short = 2
     }
index b2490dd..1e38d46 100644 (file)
@@ -146,6 +146,24 @@ object MessageGeneratorImplTest : Spek({
                             .verifyComplete()
                 }
             }
+            on("message type requesting fixed payload") {
+                it("should create flux of valid messages with fixed payload") {
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(
+                                    createSampleCommonHeader(FAULT),
+                                    MessageType.FIXED_PAYLOAD,
+                                    1
+                            )))
+                            .test()
+                            .assertNext {
+                                assertThat(it.isValid()).isTrue()
+                                assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+                            }
+                            .verifyComplete()
+                }
+            }
         }
         given("list of message parameters") {
             it("should create concatenated flux of messages") {
@@ -182,6 +200,10 @@ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
     return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
 }
 
+fun extractHvRanMeasFields(bytes: ByteData): ByteString {
+    return VesEvent.parseFrom(bytes.unsafeAsArray()).hvRanMeasFields
+}
+
 private fun createSampleCommonHeader(domain: CommonEventHeader.Domain): CommonEventHeader = CommonEventHeader.newBuilder()
         .setVersion("sample-version")
         .setDomain(domain)