Move maxPayloadSizeBytes to CollectorConfiguration 20/84120/8
authorJakub Dudycz <jakub.dudycz@nokia.com>
Mon, 8 Apr 2019 12:09:16 +0000 (14:09 +0200)
committerJakub Dudycz <jakub.dudycz@nokia.com>
Wed, 10 Apr 2019 09:06:03 +0000 (11:06 +0200)
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1387
Change-Id: I37151a58f5244841243dc531912af2ef50ea5d3c

22 files changed:
development/configuration/base.json
development/configuration/local.json
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt
sources/hv-collector-configuration/src/test/resources/sampleConfig.json
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
sources/hv-collector-main/src/main/docker/base.json
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt

index 13c4ea1..1b723b7 100644 (file)
@@ -2,7 +2,6 @@
   "logLevel": "DEBUG",
   "server.listenPort": 6061,
   "server.idleTimeoutSec": 60,
-  "server.maxPayloadSizeBytes": 1048576,
   "cbs.firstRequestDelaySec": 10,
   "cbs.requestIntervalSec": 5,
   "security.keys.keyStoreFile": "/etc/ves-hv/ssl/server.p12",
index 79abe03..ebf2f82 100644 (file)
@@ -2,7 +2,6 @@
   "logLevel": "DEBUG",
   "server.listenPort": 8061,
   "server.idleTimeoutSec": 60,
-  "server.maxPayloadSizeBytes": 1048576,
   "cbs.firstRequestDelaySec": 10,
   "cbs.requestIntervalSec": 5,
   "security.keys.keyStoreFile": "development/ssl/server.p12",
index 9338157..f0ee3a4 100644 (file)
@@ -53,7 +53,7 @@ class ConfigurationModule {
                     .throwOnLeft(::MissingArgumentException)
                     .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } }
                     .map { it.reader().use(configParser::parse) }
-                    .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } }
+                    .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
                     .cache()
                     .flatMapMany { basePartialConfig ->
                         cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
index f745d59..8db2f77 100644 (file)
@@ -37,7 +37,6 @@ data class HvVesConfiguration(
 
 data class ServerConfiguration(
         val listenPort: Int,
-        val maxPayloadSizeBytes: Int,
         val idleTimeout: Duration
 )
 
@@ -48,4 +47,12 @@ data class CbsConfiguration(
 
 data class CollectorConfiguration(
         val routing: Routing
-)
+) {
+    val maxPayloadSizeBytes by lazy {
+        routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE
+    }
+
+    companion object {
+        internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024
+    }
+}
index e782a1e..56e4803 100644 (file)
@@ -33,7 +33,6 @@ internal class ConfigurationMerger {
             PartialConfiguration(
                     listenPort = base.listenPort.updateToGivenOrNone(update.listenPort),
                     idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
-                    maxPayloadSizeBytes = base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes),
 
                     firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
                     requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec),
index dddf0be..613ae30 100644 (file)
@@ -94,7 +94,6 @@ internal class ConfigurationValidator {
             partial.mapBinding {
                 ServerConfiguration(
                         it.listenPort.bind(),
-                        it.maxPayloadSizeBytes.bind(),
                         Duration.ofSeconds(it.idleTimeoutSec.bind())
                 )
             }
@@ -139,7 +138,6 @@ internal class ConfigurationValidator {
                     .trustStorePassword(Passwords.fromString(trustStorePassword))
                     .build()
 
-
     private fun validatedCollectorConfig(partial: PartialConfiguration) =
             partial.mapBinding { config ->
                 CollectorConfiguration(
index 30f6c3e..d09a52e 100644 (file)
@@ -34,8 +34,6 @@ internal data class PartialConfiguration(
         val listenPort: Option<Int> = None,
         @SerializedName("server.idleTimeoutSec")
         val idleTimeoutSec: Option<Long> = None,
-        @SerializedName("server.maxPayloadSizeBytes")
-        val maxPayloadSizeBytes: Option<Int> = None,
 
         @SerializedName("cbs.firstRequestDelaySec")
         val firstRequestDelaySec: Option<Long> = None,
index 94eb519..8c3c22a 100644 (file)
@@ -82,8 +82,6 @@ internal object CbsConfigurationProviderTest : Spek({
 
                                 assertThat(it.listenPort).isEqualTo(Some(6061))
                                 assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
-                                assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
-
 
                                 val sinks = it.streamPublishers.orNull()!!
                                 val sink1 = sinks[0]
@@ -148,7 +146,6 @@ private val validConfiguration = JsonParser().parse("""
 {
     "server.listenPort": 6061,
     "server.idleTimeoutSec": 60,
-    "server.maxPayloadSizeBytes": 1048576,
     "streams_publishes": {
         "$PERF3GPP_REGIONAL": {
             "type": "kafka",
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt
new file mode 100644 (file)
index 0000000..dbdf4ad
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object CollectorConfigurationTest : Spek({
+
+    describe("CollectorConfiguration") {
+        describe("calculating maxPayloadSizeBytes") {
+            on("defined routes") {
+                val sampleRouting = listOf(
+                        Route(sink1.name(), sink1),
+                        Route(sink2.name(), sink2),
+                        Route(sink3.name(), sink3)
+                )
+                val configuration = CollectorConfiguration(sampleRouting)
+
+                it("should use the highest value among all routes") {
+                    assertThat(configuration.maxPayloadSizeBytes)
+                            .isEqualTo(highestMaxPayloadSize)
+                }
+            }
+
+            on("empty routing") {
+                val configuration = CollectorConfiguration(emptyList())
+
+                it("should use default value") {
+                    assertThat(configuration.maxPayloadSizeBytes)
+                            .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE)
+                }
+            }
+        }
+    }
+})
+
+private const val highestMaxPayloadSize = 3
+
+private val sink1 = mock<KafkaSink>().also {
+    whenever(it.name()).thenReturn("")
+    whenever(it.maxPayloadSizeBytes()).thenReturn(1)
+}
+
+private val sink2 = mock<KafkaSink>().also {
+    whenever(it.name()).thenReturn("")
+    whenever(it.maxPayloadSizeBytes()).thenReturn(2)
+}
+
+private val sink3 = mock<KafkaSink>().also {
+    whenever(it.name()).thenReturn("")
+    whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
+}
index 4cd2ba9..cb8d500 100644 (file)
@@ -76,8 +76,6 @@ internal object ConfigurationMergerTest : Spek({
             assertThat(result.listenPort).isEqualTo(someListenPort)
             assertThat(result.idleTimeoutSec.isEmpty()).isFalse()
             assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
-            assertThat(result.maxPayloadSizeBytes.isEmpty()).isFalse()
-            assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
         }
 
         it("merges full config into single parameter") {
@@ -89,7 +87,6 @@ internal object ConfigurationMergerTest : Spek({
             val result = ConfigurationMerger().merge(actual, diff)
 
             assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
-            assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
             assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
 
             assertThat(result.keyStoreFile.isEmpty()).isFalse()
index 5fa1fd6..0806e8c 100644 (file)
@@ -76,7 +76,6 @@ internal object ConfigurationValidatorTest : Spek({
             val config = PartialConfiguration(
                     listenPort = Some(defaultListenPort),
                     idleTimeoutSec = Some(defaultIdleTimeoutSec),
-                    maxPayloadSizeBytes = Some(defaultMaxPayloadSizeBytes),
                     firstRequestDelaySec = Some(defaultFirstReqDelaySec),
                     requestIntervalSec = Some(defaultRequestIntervalSec),
                     sslDisable = Some(false),
@@ -97,8 +96,6 @@ internal object ConfigurationValidatorTest : Spek({
                         {
                             assertThat(it.server.listenPort)
                                     .isEqualTo(defaultListenPort)
-                            assertThat(it.server.maxPayloadSizeBytes)
-                                    .isEqualTo(defaultMaxPayloadSizeBytes)
                             assertThat(it.server.idleTimeout)
                                     .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
 
@@ -116,6 +113,8 @@ internal object ConfigurationValidatorTest : Spek({
 
                             assertThat(it.collector.routing)
                                     .isEqualTo(sampleRouting)
+                            assertThat(it.collector.maxPayloadSizeBytes)
+                                    .isEqualTo(sampleMaxPayloadSize)
 
                             assertThat(it.logLevel).isEqualTo(LogLevel.TRACE)
                         }
@@ -183,7 +182,6 @@ internal object ConfigurationValidatorTest : Spek({
 
 private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPort),
                                  idleTimeoutSec: Option<Long> = Some(defaultIdleTimeoutSec),
-                                 maxPayloadSizeBytes: Option<Int> = Some(defaultMaxPayloadSizeBytes),
                                  firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec),
                                  requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec),
                                  sslDisable: Option<Boolean> = Some(false),
@@ -196,7 +194,6 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
 ) = PartialConfiguration(
         listenPort = listenPort,
         idleTimeoutSec = idleTimeoutSec,
-        maxPayloadSizeBytes = maxPayloadSizeBytes,
         firstRequestDelaySec = firstReqDelaySec,
         requestIntervalSec = requestIntervalSec,
         sslDisable = sslDisable,
@@ -209,7 +206,6 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
 )
 
 const val defaultListenPort = 1234
-const val defaultMaxPayloadSizeBytes = 2
 const val defaultRequestIntervalSec = 3L
 const val defaultIdleTimeoutSec = 10L
 const val defaultFirstReqDelaySec = 10L
@@ -220,9 +216,12 @@ const val TRUSTSTORE = "trust.ks.pkcs12"
 const val TRUSTSTORE_PASSWORD = "changeMeToo"
 
 const val sampleSinkName = "perf3gpp"
+const val sampleMaxPayloadSize = 1024
 
-private val sampleSink = mock<KafkaSink>().also {
+private val sink = mock<KafkaSink>().also {
     whenever(it.name()).thenReturn(sampleSinkName)
+    whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize)
 }
-val sampleStreamsDefinition = listOf(sampleSink)
-val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file
+
+private val sampleStreamsDefinition = listOf(sink)
+private val sampleRouting = listOf(Route(sink.name(), sink))
\ No newline at end of file
index 0fdd41c..ac43c01 100644 (file)
@@ -36,7 +36,7 @@ import java.io.File
  */
 object HvVesCommandLineParserTest : Spek({
     lateinit var cut: HvVesCommandLineParser
-    val DEFAULT_HEALTHCHECK_PORT = 6060
+    val defaultHealthcheckPort = 6060
     val emptyConfig = ""
     val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json")
 
@@ -88,7 +88,7 @@ object HvVesCommandLineParserTest : Spek({
                 it("should return default port") {
                     assertThat(
                             cut.getHealthcheckPort(arrayOf(emptyConfig))
-                    ).isEqualTo(DEFAULT_HEALTHCHECK_PORT)
+                    ).isEqualTo(defaultHealthcheckPort)
                 }
             }
         }
index ad38fd5..919f22c 100644 (file)
@@ -87,7 +87,6 @@ internal object JsonConfigurationParserTest : Spek({
 
                 assertThat(config.listenPort).isEqualTo(Some(6000))
                 assertThat(config.idleTimeoutSec).isEqualTo(Some(1200L))
-                assertThat(config.maxPayloadSizeBytes).isEqualTo(Some(1048576))
 
                 assertThat(config.firstRequestDelaySec).isEqualTo(Some(7L))
                 assertThat(config.requestIntervalSec).isEqualTo(Some(900L))
index 2c3805e..a5ad52a 100644 (file)
@@ -2,7 +2,6 @@
   "logLevel": "ERROR",
   "server.listenPort": 6000,
   "server.idleTimeoutSec": 1200,
-  "server.maxPayloadSizeBytes": 1048576,
   "cbs.firstRequestDelaySec": 7,
   "cbs.requestIntervalSec": 900,
   "security.sslDisable": false,
index 3524f14..c3c5d73 100644 (file)
@@ -37,8 +37,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
  */
 class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
                             private val sinkFactory: SinkFactory,
-                            private val metrics: Metrics,
-                            private val maxPayloadSizeBytes: Int): CollectorFactory {
+                            private val metrics: Metrics) : CollectorFactory {
 
     override fun invoke(ctx: ClientContext): Collector =
             createVesHvCollector(ctx)
@@ -48,7 +47,9 @@ class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
     private fun createVesHvCollector(ctx: ClientContext): Collector =
             HvVesCollector(
                     clientContext = ctx,
-                    wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
+                    wireChunkDecoder = WireChunkDecoder(
+                            WireFrameDecoder(configuration.maxPayloadSizeBytes), ctx
+                    ),
                     protobufDecoder = VesDecoder(),
                     router = Router(configuration.routing, sinkFactory, ctx, metrics),
                     metrics = metrics)
index 430f798..40ac4dc 100644 (file)
@@ -33,8 +33,8 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
index 35dfba8..3002f33 100644 (file)
@@ -33,8 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
index 8b2bc13..88d1567 100644 (file)
@@ -55,8 +55,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C
     private val collectorProvider = HvVesCollectorFactory(
             configuration,
             sinkProvider,
-            metrics,
-            MAX_PAYLOAD_SIZE_BYTES
+            metrics
     )
 
     val collector: Collector
@@ -64,18 +63,18 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C
 
 
     fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
-        collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+        collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
         return sink.sentMessages
     }
 
     fun handleConnection(vararg packets: ByteBuf) {
-        collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+        collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
     }
 
     override fun close() = collectorProvider.close()
 
     companion object {
-        const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+        private val TIMEOUT = Duration.ofSeconds(10)
     }
 }
 
@@ -95,8 +94,6 @@ class DummySinkFactory(private val sink: Sink) : SinkFactory {
             }
 }
 
-private val timeout = Duration.ofSeconds(10)
-
 fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
         Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
 
index d845f7c..f90f4bc 100644 (file)
@@ -30,8 +30,8 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
index 8956e81..53e4812 100644 (file)
@@ -27,7 +27,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
 const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
 const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
 const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
-private const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+const val MAX_PAYLOAD_SIZE_BYTES = 512 * 512
 
 private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
         .name("PERF3GPP")
index 0a5cae0..e302da9 100644 (file)
@@ -2,7 +2,6 @@
   "logLevel": "INFO",
   "server.listenPort": 6061,
   "server.idleTimeoutSec": 60,
-  "server.maxPayloadSizeBytes": 1048576,
   "cbs.firstRequestDelaySec": 10,
   "cbs.requestIntervalSec": 5,
   "security.sslDisable": true
index a34b711..98a094b 100644 (file)
@@ -58,8 +58,7 @@ object VesServer {
             HvVesCollectorFactory(
                     config.collector,
                     AdapterFactory.sinkCreatorFactory(),
-                    MicrometerMetrics.INSTANCE,
-                    config.server.maxPayloadSizeBytes
+                    MicrometerMetrics.INSTANCE
             )
 
     private fun logServerStarted(handle: ServerHandle) =