Introduce configurable payload size limitation 93/69893/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 5 Oct 2018 07:02:06 +0000 (09:02 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 5 Oct 2018 07:02:06 +0000 (09:02 +0200)
Maximum payload size will be configurable (from command line parameter
or environment variable). The default value is same as previous
hardcoded value, ie. 1 MiB = 1024 * 1024 bytes.

Change-Id: Iec83d8295252bac353d3794b13454fdbbc80ecc4
Issue-ID: DCAEGEN2-856
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
34 files changed:
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt [new file with mode: 0644]
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt [moved from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt with 96% similarity]
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt [moved from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt with 96% similarity]
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt [moved from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt with 99% similarity]
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt [moved from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt with 99% similarity]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt

index d807a9e..5c96e1c 100644 (file)
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference
 class CollectorFactory(val configuration: ConfigurationProvider,
                        private val sinkProvider: SinkProvider,
                        private val metrics: Metrics,
+                       private val maximumPayloadSizeBytes: Int,
                        private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
@@ -63,7 +64,9 @@ class CollectorFactory(val configuration: ConfigurationProvider,
 
     private fun createVesHvCollector(config: CollectorConfiguration): Collector {
         return VesHvCollector(
-                wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
+                wireChunkDecoderSupplier = { alloc ->
+                    WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
+                },
                 protobufDecoder = VesDecoder(),
                 router = Router(config.routing),
                 sink = sinkProvider(config),
index c14d36e..7a7d934 100644 (file)
@@ -32,4 +32,5 @@ data class ServerConfiguration(
         val securityConfiguration: SecurityConfiguration,
         val idleTimeout: Duration,
         val healthCheckApiPort: Int,
+        val maximumPayloadSizeBytes: Int,
         val dummyMode: Boolean = false)
index d214ffc..f06a0dc 100644 (file)
@@ -45,7 +45,7 @@ internal object WireChunkDecoderTest : Spek({
 
     fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
 
-    fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
+    fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
 
     fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
         for (bb in byteBuffers) {
index 67291ab..0c78dd5 100644 (file)
@@ -31,12 +31,14 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
 import reactor.core.publisher.Flux
 import reactor.math.sum
 import java.security.MessageDigest
@@ -173,7 +175,7 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt
 
 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
         WireFrameEncoder(alloc).let { encoder ->
-            MessageGenerator.INSTANCE
+            MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
                     .createMessageFlux(listOf(params))
                     .map(encoder::encode)
                     .transform { simulateRemoteTcp(alloc, 1000, it) }
index 942e6ed..0495ced 100644 (file)
@@ -45,11 +45,21 @@ class Sut(sink: Sink = StoringSink()) {
 
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
     private val metrics = FakeMetrics()
-    private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider)
+    private val collectorFactory = CollectorFactory(
+            configurationProvider,
+            SinkProvider.just(sink),
+            metrics,
+            MAX_PAYLOAD_SIZE_BYTES,
+            healthStateProvider)
     private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
         get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+
+    companion object {
+        const val MAX_PAYLOAD_SIZE_BYTES = 1024
+    }
+
 }
 
 fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
index e9c0d67..2d81c67 100644 (file)
@@ -39,7 +39,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
 
@@ -72,7 +72,7 @@ object VesHvSpecification : Spek({
             val (sut, sink) = vesHvWithStoringSink()
             val validMessage = vesWireFrameMessage(PERF3GPP)
             val msgWithInvalidFrame = invalidWireFrame()
-            val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP)
+            val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
             val expectedRefCnt = 0
 
             val handledEvents = sut.handleConnection(
@@ -329,7 +329,7 @@ object VesHvSpecification : Spek({
 
             val handledMessages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP, "first"),
-                    vesMessageWithTooBigPayload(PERF3GPP),
+                    vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
                     vesWireFrameMessage(PERF3GPP))
 
             assertThat(handledMessages).hasSize(1)
index 262e05b..1a8af87 100644 (file)
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference
  * @since August 2018
  */
 class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
-                       private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) {
+                       private val messageStreamValidation: MessageStreamValidation) {
     private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
 
     fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
index 38de537..c910b53 100644 (file)
@@ -35,8 +35,8 @@ import java.io.InputStream
 import javax.json.Json
 
 class MessageStreamValidation(
-        private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
-        private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+        private val messageGenerator: MessageGenerator,
+        private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
 
     fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
             IO.monadError().bindingCatch {
index d082061..83dceb6 100644 (file)
@@ -26,17 +26,20 @@ import arrow.instances.extensions
 import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.utils.commandline.intValue
 import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
 
 class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
     override val cmdLineOptionsList: List<CommandLineOption> = listOf(
             LISTEN_PORT,
+            MAXIMUM_PAYLOAD_SIZE_BYTES,
             KAFKA_SERVERS,
             KAFKA_TOPICS
     )
@@ -47,6 +50,8 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
                     val listenPort = cmdLine
                             .intValue(LISTEN_PORT)
                             .bind()
+                    val maxPayloadSizeBytes = cmdLine
+                            .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
                     val kafkaBootstrapServers = cmdLine
                             .stringValue(KAFKA_SERVERS)
                             .bind()
@@ -57,6 +62,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
 
                     DcaeAppSimConfiguration(
                             listenPort,
+                            maxPayloadSizeBytes,
                             kafkaBootstrapServers,
                             kafkaTopics)
                 }.fix()
index c0f8b34..06ff4d5 100644 (file)
@@ -24,12 +24,14 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.arrow.unit
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
 
 private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
 private val logger = Logger(PACKAGE_NAME)
@@ -51,7 +53,10 @@ fun main(args: Array<String>) =
 
 
 private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
-    return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
+    logger.info("Using configuration: $config")
+    val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
+    val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+    return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
             .start(config.apiPort, config.kafkaTopics)
             .unit()
 }
index 05fdd80..5e3090a 100644 (file)
@@ -53,7 +53,7 @@ internal class MessageStreamValidationTest : Spek({
     beforeEachTest {
         messageParametersParser = mock()
         messageGenerator = mock()
-        cut = MessageStreamValidation(messageParametersParser, messageGenerator)
+        cut = MessageStreamValidation(messageGenerator, messageParametersParser)
     }
 
     fun givenParsedMessageParameters(vararg params: MessageParameters) {
index 4f867f1..f08ec17 100644 (file)
@@ -24,7 +24,6 @@ import arrow.core.Left
 import arrow.core.Right
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
 
 /**
@@ -52,7 +51,7 @@ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocato
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-class WireFrameDecoder {
+class WireFrameDecoder(private val maxPayloadSizeBytes: Int) {
 
     fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
             when {
@@ -81,13 +80,13 @@ class WireFrameDecoder {
     private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
         val versionMajor = byteBuf.readUnsignedByte()
         val versionMinor = byteBuf.readUnsignedByte()
-        byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
+        byteBuf.skipBytes(RESERVED_BYTE_COUNT)
         val payloadTypeRaw = byteBuf.readUnsignedByte()
         val payloadSize = byteBuf.readInt()
 
-        if (payloadSize > MAX_PAYLOAD_SIZE) {
+        if (payloadSize > maxPayloadSizeBytes) {
             byteBuf.resetReaderIndex()
-            return Left(PayloadSizeExceeded)
+            return Left(PayloadSizeExceeded(maxPayloadSizeBytes))
         }
 
         if (byteBuf.readableBytes() < payloadSize) {
index dfadc5b..0d55ceb 100644 (file)
@@ -19,8 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.domain
 
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
@@ -38,7 +36,8 @@ class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
                 .format(WireFrameMessage.MARKER_BYTE, actualMarker)
 )
 
-object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
+class PayloadSizeExceeded(maxPayloadSizeBytes: Int) :
+        InvalidWireFrame("payload size exceeds the limit ($maxPayloadSizeBytes bytes)")
 
 // Missing bytes errors
 
index 036888e..5fdc5af 100644 (file)
@@ -70,6 +70,6 @@ data class WireFrameMessage(val payload: ByteData,
                         RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes
                         1 * java.lang.Integer.BYTES                  // payload length
 
-        const val MAX_PAYLOAD_SIZE = 1024 * 1024
+        const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
     }
 }
index 6756bf8..f17a79b 100644 (file)
@@ -28,7 +28,6 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 import java.nio.charset.Charset
 import kotlin.test.assertTrue
 import kotlin.test.fail
@@ -39,8 +38,9 @@ import kotlin.test.fail
  */
 object WireFrameCodecsTest : Spek({
     val payloadAsString = "coffeebabe"
+    val maxPayloadSizeBytes = 1024
     val encoder = WireFrameEncoder()
-    val decoder = WireFrameDecoder()
+    val decoder = WireFrameDecoder(maxPayloadSizeBytes)
 
     fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
 
@@ -223,7 +223,7 @@ object WireFrameCodecsTest : Spek({
 
             it("should decode successfully when payload size is equal 1 MiB") {
 
-                val payload = ByteArray(MAX_PAYLOAD_SIZE)
+                val payload = ByteArray(maxPayloadSizeBytes)
                 val input = WireFrameMessage(
                         payload = ByteData(payload),
                         versionMajor = 1,
@@ -237,7 +237,7 @@ object WireFrameCodecsTest : Spek({
 
             it("should return error when payload exceeds 1 MiB") {
 
-                val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
+                val payload = ByteArray(maxPayloadSizeBytes + 1)
                 val input = WireFrameMessage(
                         payload = ByteData(payload),
                         versionMajor = 1,
@@ -253,7 +253,7 @@ object WireFrameCodecsTest : Spek({
 
             it("should validate only first message") {
 
-                val payload = ByteArray(MAX_PAYLOAD_SIZE)
+                val payload = ByteArray(maxPayloadSizeBytes)
                 val input = WireFrameMessage(
                         payload = ByteData(payload),
                         versionMajor = 1,
index d6ff9ef..826982d 100644 (file)
@@ -27,6 +27,7 @@ import arrow.instances.extensions
 import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
@@ -40,6 +41,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_T
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
@@ -62,6 +64,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
             TRUST_STORE_FILE,
             TRUST_STORE_PASSWORD,
             IDLE_TIMEOUT_SEC,
+            MAXIMUM_PAYLOAD_SIZE_BYTES,
             DUMMY_MODE
     )
 
@@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
                 )
                 val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
                 val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+                val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, DefaultValues.MAX_PAYLOAD_SIZE_BYTES)
                 val dummyMode = cmdLine.hasOption(DUMMY_MODE)
                 val security = createSecurityConfiguration(cmdLine).bind()
                 val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
@@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
                         configurationProviderParams = configurationProviderParams,
                         securityConfiguration = security,
                         idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+                        maximumPayloadSizeBytes = maxPayloadSizeBytes,
                         dummyMode = dummyMode)
             }.fix()
 
@@ -110,5 +115,6 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
         const val CONSUL_FIRST_REQUEST_DELAY = 10L
         const val CONSUL_REQUEST_INTERVAL = 5L
         const val IDLE_TIMEOUT_SEC = 60L
+        const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
     }
 }
index a84a39a..78d4283 100644 (file)
@@ -48,6 +48,7 @@ fun main(args: Array<String>) =
 
 private fun startAndAwaitServers(config: ServerConfiguration) =
         IO.monad().binding {
+            logger.info("Using configuration: $config")
             HealthCheckServer.start(config).bind()
             VesServer.start(config).bind()
                     .await().bind()
index fbf8936..992a4dc 100644 (file)
@@ -40,7 +40,8 @@ object VesServer : ServerStarter() {
         val collectorProvider = CollectorFactory(
                 AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
                 sink,
-                MicrometerMetrics()
+                MicrometerMetrics(),
+                config.maximumPayloadSizeBytes
         ).createVesHvCollectorProvider()
 
         return ServerFactory.createNettyTcpServer(config, collectorProvider)
index 035d94e..3bf615a 100644 (file)
@@ -23,7 +23,6 @@ import com.google.protobuf.ByteString
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
@@ -70,13 +69,13 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
     writeByte(0x01)   // version minor
 }
 
-fun vesMessageWithTooBigPayload(domain: VesEventDomain = PERF3GPP): ByteBuf =
+fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
         allocator.buffer().run {
             writeValidWireFrameHeaders()
 
             val gpb = vesEvent(
                     domain = domain,
-                    eventFields = ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+                    eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
             ).toByteString().asReadOnlyByteBuffer()
 
             writeInt(gpb.limit())  // ves event size in bytes
index 288a578..9439bff 100644 (file)
@@ -119,6 +119,12 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false)
                 |connection might be closed.""".trimMargin())
             .build()
     ),
+    MAXIMUM_PAYLOAD_SIZE_BYTES(Option.builder("m")
+            .longOpt("max-payload-size")
+            .hasArg()
+            .desc("Maximum supported payload size in bytes")
+            .build()
+    ),
     DUMMY_MODE(Option.builder("u")
             .longOpt("dummy")
             .desc("If present will start in dummy mode (dummy external services)")
index ace7f1c..076c06b 100644 (file)
@@ -20,8 +20,6 @@
 package org.onap.dcae.collectors.veshv.ves.message.generator.api
 
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
 import reactor.core.publisher.Flux
 
 /**
@@ -32,10 +30,6 @@ interface MessageGenerator {
     fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
 
     companion object {
-        val INSTANCE: MessageGenerator by lazy {
-            MessageGeneratorImpl(PayloadGenerator())
-        }
-
         const val FIXED_PAYLOAD_SIZE = 100
     }
 }
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
new file mode 100644 (file)
index 0000000..e2269c2
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.factory
+
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since October 2018
+ */
+object MessageGeneratorFactory {
+    fun create(maxPayloadSizeBytes: Int): MessageGenerator =
+            MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
+}
index 2bb9f78..cc1d16f 100644 (file)
@@ -42,7 +42,10 @@ import java.nio.charset.Charset
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
+class MessageGeneratorImpl internal constructor(
+        private val payloadGenerator: PayloadGenerator,
+        private val maxPayloadSizeBytes: Int
+) : MessageGenerator {
 
     override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
             .fromIterable(messageParameters)
@@ -89,7 +92,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
                     .build()
 
     private fun oversizedPayload() =
-            payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+            payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1)
 
     private fun fixedPayload() =
             payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 
 import com.google.protobuf.ByteString
 import com.google.protobuf.InvalidProtocolBufferException
@@ -47,7 +47,8 @@ import reactor.test.test
  */
 object MessageGeneratorImplTest : Spek({
     describe("message factory") {
-        val generator = MessageGenerator.INSTANCE
+        val maxPayloadSizeBytes = 1024
+        val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
         given("single message parameters") {
             on("messages amount not specified in parameters") {
                 it("should create infinite flux") {
@@ -87,7 +88,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
                             }
                             .verifyComplete()
@@ -105,7 +106,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
                             }
                             .verifyComplete()
@@ -122,7 +123,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
                                         .isThrownBy { extractCommonEventHeader(it.payload) }
                             }
@@ -140,7 +141,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isFalse()
-                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
                                 assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
                             }
@@ -158,7 +159,7 @@ object MessageGeneratorImplTest : Spek({
                             .test()
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
-                                assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                                assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
                             }
@@ -177,17 +178,17 @@ object MessageGeneratorImplTest : Spek({
                 generator.createMessageFlux(messageParameters)
                         .test()
                         .assertNext {
-                            assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
                         }
                         .expectNextCount(singleFluxSize - 1)
                         .assertNext {
-                            assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
                         }
                         .expectNextCount(singleFluxSize - 1)
                         .assertNext {
-                            assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+                            assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName)
                         }
                         .expectNextCount(singleFluxSize - 1)
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.fail
@@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
 
 private const val EXPECTED_MESSAGES_AMOUNT = 25000L
 
index 3fde2c7..ec5ef81 100644 (file)
@@ -39,8 +39,8 @@ import javax.json.Json
  */
 class XnfSimulator(
         private val vesClient: VesHvClient,
-        private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
-        private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+        private val messageGenerator: MessageGenerator,
+        private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
 
     fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
             Either.monad<ParsingError>().binding {
index 54ead6f..06f1cff 100644 (file)
@@ -22,12 +22,8 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
 import arrow.core.Either
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.utils.http.Content
-import org.onap.dcae.collectors.veshv.utils.http.ContentType
 import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
 import org.onap.dcae.collectors.veshv.utils.http.Response
 import org.onap.dcae.collectors.veshv.utils.http.Responses
 import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
@@ -40,7 +36,6 @@ import ratpack.http.TypedData
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
 import java.util.*
-import javax.json.Json
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
index 3d8dc94..7966a4e 100644 (file)
@@ -25,11 +25,13 @@ import arrow.core.monad
 import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
@@ -48,6 +50,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
             VES_HV_PORT,
             VES_HV_HOST,
             LISTEN_PORT,
+            MAXIMUM_PAYLOAD_SIZE_BYTES,
             SSL_DISABLE,
             KEY_STORE_FILE,
             KEY_STORE_PASSWORD,
@@ -59,12 +62,15 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
                 val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
                 val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
                 val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
+                val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
 
                 SimulatorConfiguration(
                         listenPort,
                         vesHost,
                         vesPort,
+                        maxPayloadSizeBytes,
                         createSecurityConfiguration(cmdLine).bind())
             }.fix()
 
+
 }
index c9e900a..4512dfb 100644 (file)
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.arrow.unit
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
 
 private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
 private val logger = Logger(PACKAGE_NAME)
@@ -41,7 +42,11 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
 fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
         .map { config ->
-            XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
+            logger.info("Using configuration: $config")
+            val xnfSimulator = XnfSimulator(
+                    VesHvClient(config),
+                    MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+            XnfApiServer(xnfSimulator, OngoingSimulations())
                     .start(config.listenPort)
                     .unit()
         }
index 9753588..2a78ed5 100644 (file)
@@ -54,7 +54,7 @@ internal class XnfSimulatorTest : Spek({
         vesClient = mock()
         messageParametersParser = mock()
         messageGenerator = mock()
-        cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+        cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser)
     }
 
     describe("startSimulation") {