"logLevel": "DEBUG",
"server.listenPort": 6061,
"server.idleTimeoutSec": 60,
- "server.maxPayloadSizeBytes": 1048576,
"cbs.firstRequestDelaySec": 10,
"cbs.requestIntervalSec": 5,
"security.keys.keyStoreFile": "/etc/ves-hv/ssl/server.p12",
"logLevel": "DEBUG",
"server.listenPort": 8061,
"server.idleTimeoutSec": 60,
- "server.maxPayloadSizeBytes": 1048576,
"cbs.firstRequestDelaySec": 10,
"cbs.requestIntervalSec": 5,
"security.keys.keyStoreFile": "development/ssl/server.p12",
.throwOnLeft(::MissingArgumentException)
.doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } }
.map { it.reader().use(configParser::parse) }
- .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } }
+ .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
data class ServerConfiguration(
val listenPort: Int,
- val maxPayloadSizeBytes: Int,
val idleTimeout: Duration
)
data class CollectorConfiguration(
val routing: Routing
-)
+) {
+ val maxPayloadSizeBytes by lazy {
+ routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE
+ }
+
+ companion object {
+ internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024
+ }
+}
PartialConfiguration(
listenPort = base.listenPort.updateToGivenOrNone(update.listenPort),
idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
- maxPayloadSizeBytes = base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes),
firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec),
partial.mapBinding {
ServerConfiguration(
it.listenPort.bind(),
- it.maxPayloadSizeBytes.bind(),
Duration.ofSeconds(it.idleTimeoutSec.bind())
)
}
.trustStorePassword(Passwords.fromString(trustStorePassword))
.build()
-
private fun validatedCollectorConfig(partial: PartialConfiguration) =
partial.mapBinding { config ->
CollectorConfiguration(
val listenPort: Option<Int> = None,
@SerializedName("server.idleTimeoutSec")
val idleTimeoutSec: Option<Long> = None,
- @SerializedName("server.maxPayloadSizeBytes")
- val maxPayloadSizeBytes: Option<Int> = None,
@SerializedName("cbs.firstRequestDelaySec")
val firstRequestDelaySec: Option<Long> = None,
assertThat(it.listenPort).isEqualTo(Some(6061))
assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
- assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
-
val sinks = it.streamPublishers.orNull()!!
val sink1 = sinks[0]
{
"server.listenPort": 6061,
"server.idleTimeoutSec": 60,
- "server.maxPayloadSizeBytes": 1048576,
"streams_publishes": {
"$PERF3GPP_REGIONAL": {
"type": "kafka",
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object CollectorConfigurationTest : Spek({
+
+ describe("CollectorConfiguration") {
+ describe("calculating maxPayloadSizeBytes") {
+ on("defined routes") {
+ val sampleRouting = listOf(
+ Route(sink1.name(), sink1),
+ Route(sink2.name(), sink2),
+ Route(sink3.name(), sink3)
+ )
+ val configuration = CollectorConfiguration(sampleRouting)
+
+ it("should use the highest value among all routes") {
+ assertThat(configuration.maxPayloadSizeBytes)
+ .isEqualTo(highestMaxPayloadSize)
+ }
+ }
+
+ on("empty routing") {
+ val configuration = CollectorConfiguration(emptyList())
+
+ it("should use default value") {
+ assertThat(configuration.maxPayloadSizeBytes)
+ .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE)
+ }
+ }
+ }
+ }
+})
+
+private const val highestMaxPayloadSize = 3
+
+private val sink1 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(1)
+}
+
+private val sink2 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(2)
+}
+
+private val sink3 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
+}
assertThat(result.listenPort).isEqualTo(someListenPort)
assertThat(result.idleTimeoutSec.isEmpty()).isFalse()
assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
- assertThat(result.maxPayloadSizeBytes.isEmpty()).isFalse()
- assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
}
it("merges full config into single parameter") {
val result = ConfigurationMerger().merge(actual, diff)
assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
- assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
assertThat(result.keyStoreFile.isEmpty()).isFalse()
val config = PartialConfiguration(
listenPort = Some(defaultListenPort),
idleTimeoutSec = Some(defaultIdleTimeoutSec),
- maxPayloadSizeBytes = Some(defaultMaxPayloadSizeBytes),
firstRequestDelaySec = Some(defaultFirstReqDelaySec),
requestIntervalSec = Some(defaultRequestIntervalSec),
sslDisable = Some(false),
{
assertThat(it.server.listenPort)
.isEqualTo(defaultListenPort)
- assertThat(it.server.maxPayloadSizeBytes)
- .isEqualTo(defaultMaxPayloadSizeBytes)
assertThat(it.server.idleTimeout)
.isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
assertThat(it.collector.routing)
.isEqualTo(sampleRouting)
+ assertThat(it.collector.maxPayloadSizeBytes)
+ .isEqualTo(sampleMaxPayloadSize)
assertThat(it.logLevel).isEqualTo(LogLevel.TRACE)
}
private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPort),
idleTimeoutSec: Option<Long> = Some(defaultIdleTimeoutSec),
- maxPayloadSizeBytes: Option<Int> = Some(defaultMaxPayloadSizeBytes),
firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec),
requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec),
sslDisable: Option<Boolean> = Some(false),
) = PartialConfiguration(
listenPort = listenPort,
idleTimeoutSec = idleTimeoutSec,
- maxPayloadSizeBytes = maxPayloadSizeBytes,
firstRequestDelaySec = firstReqDelaySec,
requestIntervalSec = requestIntervalSec,
sslDisable = sslDisable,
)
const val defaultListenPort = 1234
-const val defaultMaxPayloadSizeBytes = 2
const val defaultRequestIntervalSec = 3L
const val defaultIdleTimeoutSec = 10L
const val defaultFirstReqDelaySec = 10L
const val TRUSTSTORE_PASSWORD = "changeMeToo"
const val sampleSinkName = "perf3gpp"
+const val sampleMaxPayloadSize = 1024
-private val sampleSink = mock<KafkaSink>().also {
+private val sink = mock<KafkaSink>().also {
whenever(it.name()).thenReturn(sampleSinkName)
+ whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize)
}
-val sampleStreamsDefinition = listOf(sampleSink)
-val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file
+
+private val sampleStreamsDefinition = listOf(sink)
+private val sampleRouting = listOf(Route(sink.name(), sink))
\ No newline at end of file
*/
object HvVesCommandLineParserTest : Spek({
lateinit var cut: HvVesCommandLineParser
- val DEFAULT_HEALTHCHECK_PORT = 6060
+ val defaultHealthcheckPort = 6060
val emptyConfig = ""
val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json")
it("should return default port") {
assertThat(
cut.getHealthcheckPort(arrayOf(emptyConfig))
- ).isEqualTo(DEFAULT_HEALTHCHECK_PORT)
+ ).isEqualTo(defaultHealthcheckPort)
}
}
}
assertThat(config.listenPort).isEqualTo(Some(6000))
assertThat(config.idleTimeoutSec).isEqualTo(Some(1200L))
- assertThat(config.maxPayloadSizeBytes).isEqualTo(Some(1048576))
assertThat(config.firstRequestDelaySec).isEqualTo(Some(7L))
assertThat(config.requestIntervalSec).isEqualTo(Some(900L))
"logLevel": "ERROR",
"server.listenPort": 6000,
"server.idleTimeoutSec": 1200,
- "server.maxPayloadSizeBytes": 1048576,
"cbs.firstRequestDelaySec": 7,
"cbs.requestIntervalSec": 900,
"security.sslDisable": false,
*/
class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
private val sinkFactory: SinkFactory,
- private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int): CollectorFactory {
+ private val metrics: Metrics) : CollectorFactory {
override fun invoke(ctx: ClientContext): Collector =
createVesHvCollector(ctx)
private fun createVesHvCollector(ctx: ClientContext): Collector =
HvVesCollector(
clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
+ wireChunkDecoder = WireChunkDecoder(
+ WireFrameDecoder(configuration.maxPayloadSizeBytes), ctx
+ ),
protobufDecoder = VesDecoder(),
router = Router(configuration.routing, sinkFactory, ctx, metrics),
metrics = metrics)
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
private val collectorProvider = HvVesCollectorFactory(
configuration,
sinkProvider,
- metrics,
- MAX_PAYLOAD_SIZE_BYTES
+ metrics
)
val collector: Collector
fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+ collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
return sink.sentMessages
}
fun handleConnection(vararg packets: ByteBuf) {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+ collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
}
override fun close() = collectorProvider.close()
companion object {
- const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+ private val TIMEOUT = Duration.ofSeconds(10)
}
}
}
}
-private val timeout = Duration.ofSeconds(10)
-
fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
-private const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+const val MAX_PAYLOAD_SIZE_BYTES = 512 * 512
private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
.name("PERF3GPP")
"logLevel": "INFO",
"server.listenPort": 6061,
"server.idleTimeoutSec": 60,
- "server.maxPayloadSizeBytes": 1048576,
"cbs.firstRequestDelaySec": 10,
"cbs.requestIntervalSec": 5,
"security.sslDisable": true
HvVesCollectorFactory(
config.collector,
AdapterFactory.sinkCreatorFactory(),
- MicrometerMetrics.INSTANCE,
- config.server.maxPayloadSizeBytes
+ MicrometerMetrics.INSTANCE
)
private fun logServerStarted(handle: ServerHandle) =