Align kafka properties with VESHV_MAX_PAYLOAD_SIZE 58/75458/8
authorkjaniak <kornel.janiak@nokia.com>
Tue, 8 Jan 2019 12:00:37 +0000 (13:00 +0100)
committerkjaniak <kornel.janiak@nokia.com>
Thu, 10 Jan 2019 09:58:52 +0000 (10:58 +0100)
Change-Id: I5cbfb8a982cd1efbdf58c2c0aed71f064f7b7cb8
Issue-ID: DCAEGEN2-1066
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt

index 2fa4f54..5e7d9f5 100644 (file)
@@ -25,7 +25,8 @@ import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CON
 import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
 import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
 import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
-import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
@@ -33,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderOptions
+import java.lang.Integer.max
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -46,17 +48,28 @@ internal class KafkaSinkProvider internal constructor(
     override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
 
     companion object {
+        private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
+        private const val BUFFER_MEMORY_MULTIPLIER = 32
+        private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
         private fun constructKafkaSender(config: KafkaConfiguration) =
                 KafkaSender.create(constructSenderOptions(config))
 
         private fun constructSenderOptions(config: KafkaConfiguration) =
                 SenderOptions.create<CommonEventHeader, VesMessage>()
                         .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
+                        .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config))
+                        .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config))
                         .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
                         .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
                         .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
                         .producerProperty(RETRIES_CONFIG, 1)
                         .producerProperty(ACKS_CONFIG, "1")
                         .stopOnError(false)
+
+        private fun maxRequestSize(config: KafkaConfiguration) =
+                (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt()
+
+        private fun bufferMemory(config: KafkaConfiguration) =
+                max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes)
     }
 }
index f65e157..2aa2791 100644 (file)
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since December 2018
  */
-data class KafkaConfiguration(val bootstrapServers: String)
+data class KafkaConfiguration(val bootstrapServers: String, val maximalRequestSizeBytes: Int)
index 3a924e4..f23154a 100644 (file)
@@ -36,7 +36,8 @@ import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
 internal object KafkaSinkProviderTest : Spek({
     describe("non functional requirements") {
         given("sample configuration") {
-            val config = KafkaConfiguration("localhost:9090")
+            val config = KafkaConfiguration("localhost:9090",
+                    1024 * 1024)
             val cut = KafkaSinkProvider(config)
 
             on("sample clients") {
index 2311b2b..c97486c 100644 (file)
@@ -91,7 +91,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
             val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
             ServerConfiguration(
                 serverListenAddress = InetSocketAddress(listenPort),
-                kafkaConfiguration = KafkaConfiguration(kafkaServers),
+                kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes),
                 healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
                 configurationProviderParams = configurationProviderParams,
                 securityConfiguration = security,