Add environment configuration to kafka consumer 27/90427/6
authorFilip Krzywka <filip.krzywka@nokia.com>
Tue, 25 Jun 2019 09:32:36 +0000 (11:32 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Wed, 26 Jun 2019 06:57:37 +0000 (08:57 +0200)
- HV-VES-specific environment prefix moved inside HvVes modules to
allow simpler no-prefix API for other modules
- created OptionDSL for brevity

Change-Id: I2fabbda1280cc0f913f8a0a04b4a055f39ed1fae
Issue-ID: DCAEGEN2-1626
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt

index d08f6c0..9d87557 100644 (file)
@@ -23,109 +23,118 @@ import org.apache.commons.cli.Option
 
 
 enum class CommandLineOption(val option: Option, val required: Boolean = false) {
-    HEALTH_CHECK_API_PORT(
-            Option.builder("H")
-                    .longOpt("health-check-api-port")
-                    .hasArg()
-                    .desc("Health check rest api listen port")
-                    .build()
-    ),
-    CONFIGURATION_FILE(
-            Option.builder("c")
-                    .longOpt("configuration-file")
-                    .hasArg()
-                    .desc("Json file containing HV-VES configuration")
-                    .build(),
-            required = true
-    ),
-    LISTEN_PORT(
-            Option.builder("p")
-                    .longOpt("listen-port")
-                    .hasArg()
-                    .desc("Listen port")
-                    .build(),
-            required = true
-    ),
-    VES_HV_PORT(
-            Option.builder("v")
-                    .longOpt("ves-port")
-                    .hasArg()
-                    .desc("VesHvCollector port")
-                    .build(),
-            required = true
-    ),
-    VES_HV_HOST(
-            Option.builder("h")
-                    .longOpt("ves-host")
-                    .hasArg()
-                    .desc("VesHvCollector host")
-                    .build(),
-            required = true
-    ),
-    KAFKA_SERVERS(
-            Option.builder("s")
-                    .longOpt("kafka-bootstrap-servers")
-                    .hasArg()
-                    .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
-                    .build(),
-            required = true
-    ),
-    KAFKA_TOPICS(
-            Option.builder("f")
-                    .longOpt("kafka-topics")
-                    .hasArg()
-                    .desc("Comma-separated Kafka topics")
-                    .build(),
-            required = true
-    ),
-    SSL_DISABLE(
-            Option.builder("l")
-                    .longOpt("ssl-disable")
-                    .desc("Disable SSL encryption")
-                    .build()
-    ),
-    KEY_STORE_FILE(
-            Option.builder("k")
-                    .longOpt("key-store")
-                    .hasArg()
-                    .desc("Key store in PKCS12 format")
-                    .build()
-    ),
-    KEY_STORE_PASSWORD_FILE(
-            Option.builder("kp")
-                    .longOpt("key-store-password-file")
-                    .hasArg()
-                    .desc("File with key store password")
-                    .build()
-    ),
-    TRUST_STORE_FILE(
-            Option.builder("t")
-                    .longOpt("trust-store")
-                    .hasArg()
-                    .desc("File with trusted certificate bundle in PKCS12 format")
-                    .build()
-    ),
-    TRUST_STORE_PASSWORD_FILE(
-            Option.builder("tp")
-                    .longOpt("trust-store-password-file")
-                    .hasArg()
-                    .desc("File with trust store password")
-                    .build()
-    ),
-    MAXIMUM_PAYLOAD_SIZE_BYTES(
-            Option.builder("m")
-                    .longOpt("max-payload-size")
-                    .hasArg()
-                    .desc("Maximum supported payload size in bytes")
-                    .build()
-    );
+    CONFIGURATION_FILE(required = true,
+            option = option {
+                shortOpt = "c"
+                longOpt = "configuration-file"
+                desc = "Json file containing HV-VES configuration"
+                hasArgument = true
+            }),
+    LISTEN_PORT(required = true,
+            option = option {
+                shortOpt = "p"
+                longOpt = "listen-port"
+                desc = "Listen port"
+                hasArgument = true
+            }),
+    VES_HV_PORT(required = true,
+            option = option {
+                shortOpt = "v"
+                longOpt = "ves-port"
+                desc = "VesHvCollector port"
+                hasArgument = true
+            }),
+    VES_HV_HOST(required = true,
+            option = option {
+                shortOpt = "h"
+                longOpt = "ves-host"
+                desc = "VesHvCollector host"
+                hasArgument = true
+            }),
+    KAFKA_SERVERS(required = true,
+            option = option {
+                shortOpt = "s"
+                longOpt = "kafka-bootstrap-servers"
+                desc = "Comma-separated Kafka bootstrap servers in <host>:<port> format"
+                hasArgument = true
+            }),
+    KAFKA_TOPICS(required = true,
+            option = option {
+                shortOpt = "f"
+                longOpt = "kafka-topics"
+                desc = "Comma-separated Kafka topics"
+                hasArgument = true
+            }),
+    HEALTH_CHECK_API_PORT(option {
+        shortOpt = "H"
+        longOpt = "health-check-api-port"
+        desc = "Health check rest api listen port"
+        hasArgument = true
+    }),
+    SSL_DISABLE(option {
+        shortOpt = "l"
+        longOpt = "ssl-disable"
+        desc = "Disable SSL encryption"
+    }),
+    KEY_STORE_FILE(option {
+        shortOpt = "k"
+        longOpt = "key-store"
+        desc = "Key store in PKCS12 format"
+        hasArgument = true
+    }),
+    KEY_STORE_PASSWORD_FILE(option {
+        shortOpt = "kp"
+        longOpt = "key-store-password-file"
+        desc = "File with key store password"
+        hasArgument = true
+    }),
+    TRUST_STORE_FILE(option {
+        shortOpt = "t"
+        longOpt = "trust-store"
+        desc = "File with trusted certificate bundle in PKCS12 format"
+        hasArgument = true
+    }),
+    TRUST_STORE_PASSWORD_FILE(option {
+        shortOpt = "tp"
+        longOpt = "trust-store-password-file"
+        desc = "File with trust store password"
+        hasArgument = true
+    }),
+    MAXIMUM_PAYLOAD_SIZE_BYTES(option {
+        shortOpt = "m"
+        longOpt = "max-payload-size"
+        desc = "Maximum supported payload size in bytes"
+        hasArgument = true
+    }),
+    DISABLE_PROCESSING(option {
+        shortOpt = "d"
+        longOpt = "disable-processing"
+        desc = "Message queue consumer option. Indicates whether messages should be fully processed"
+    });
 
-    fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String =
+    fun environmentVariableName(prefix: String = ""): String =
             option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
-                "${prefix}_${mainPart}"
+                if (prefix.isNotBlank()) {
+                    "${prefix}_${mainPart}"
+                } else {
+                    mainPart
+                }
             }
+}
+
 
-    companion object {
-        private const val DEFAULT_ENV_PREFIX = "VESHV"
-    }
+private class OptionDSL {
+    lateinit var shortOpt: String
+    lateinit var longOpt: String
+    lateinit var desc: String
+    var hasArgument: Boolean = false
 }
+
+private fun option(conf: OptionDSL.() -> Unit): Option {
+    val dsl = OptionDSL().apply(conf)
+    return Option.builder(dsl.shortOpt)
+            .longOpt(dsl.longOpt)
+            .hasArg(dsl.hasArgument)
+            .desc(dsl.desc)
+            .build()
+}
\ No newline at end of file
index 6d8ba3f..20ca97e 100644 (file)
@@ -40,28 +40,43 @@ fun handleWrongArgumentError(programName: String, err: WrongArgumentError): Exit
     return ExitFailure(2)
 }
 
-fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
-        longValue(cmdLineOpt).getOrElse { default }
+inline class EnvPrefix(val it: String)
 
-fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String =
-        optionValue(cmdLineOpt).getOrElse { default }
+private val DEFAULT_PREFIX = EnvPrefix("")
 
-fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int =
-        intValue(cmdLineOpt).getOrElse { default }
+fun CommandLine.longValue(cmdLineOpt: CommandLineOption,
+                          default: Long,
+                          envPrefix: EnvPrefix = DEFAULT_PREFIX): Long =
+        longValue(cmdLineOpt, envPrefix).getOrElse { default }
 
-fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> =
-        optionValue(cmdLineOpt).map(String::toInt)
+fun CommandLine.stringValue(cmdLineOpt: CommandLineOption,
+                            default: String,
+                            envPrefix: EnvPrefix = DEFAULT_PREFIX): String =
+        optionValue(cmdLineOpt, envPrefix).getOrElse { default }
 
-fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
-        optionValue(cmdLineOpt).map(String::toLong)
+fun CommandLine.intValue(cmdLineOpt: CommandLineOption,
+                         default: Int,
+                         envPrefix: EnvPrefix = DEFAULT_PREFIX): Int =
+        intValue(cmdLineOpt, envPrefix).getOrElse { default }
 
-fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> =
-        optionValue(cmdLineOpt)
+fun CommandLine.intValue(cmdLineOpt: CommandLineOption,
+                         envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Int> =
+        optionValue(cmdLineOpt, envPrefix).map(String::toInt)
 
-fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
+fun CommandLine.longValue(cmdLineOpt: CommandLineOption,
+                          envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<Long> =
+        optionValue(cmdLineOpt, envPrefix).map(String::toLong)
+
+fun CommandLine.stringValue(cmdLineOpt: CommandLineOption,
+                            envPrefix: EnvPrefix = DEFAULT_PREFIX): Option<String> =
+        optionValue(cmdLineOpt, envPrefix)
+
+fun CommandLine.hasOption(cmdLineOpt: CommandLineOption,
+                          envPrefix: EnvPrefix = DEFAULT_PREFIX): Boolean =
         this.hasOption(cmdLineOpt.option.opt) ||
-                System.getenv(cmdLineOpt.environmentVariableName()) != null
+                System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) != null
 
-private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
+private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption, envPrefix: EnvPrefix) = Option.fromNullablesChain(
         getOptionValue(cmdLineOpt.option.opt),
-        { System.getenv(cmdLineOpt.environmentVariableName()) })
+        { System.getenv(cmdLineOpt.environmentVariableName(envPrefix.it)) })
+
index 6614e77..e677697 100644 (file)
@@ -53,8 +53,8 @@ class CommandLineOptionTest : Spek({
                 on("calling environmentVariableName") {
                     val result = opt.environmentVariableName()
 
-                    it("should return prefixed upper snake cased long option name") {
-                        assertThat(result).isEqualTo("VESHV_SSL_DISABLE")
+                    it("should return upper snake cased long option name without prefix") {
+                        assertThat(result).isEqualTo("SSL_DISABLE")
                     }
                 }
             }
index c6730a4..2756064 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.commons.cli.DefaultParser
 import org.apache.commons.cli.Options
 import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE
 import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
+import org.onap.dcae.collectors.veshv.commandline.EnvPrefix
 import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
 import org.onap.dcae.collectors.veshv.commandline.intValue
 import org.onap.dcae.collectors.veshv.commandline.stringValue
@@ -42,7 +43,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
 
     fun getConfigurationFile(args: Array<out String>): Either<WrongArgumentError, File> =
             parse(args) {
-                it.stringValue(CONFIGURATION_FILE).map(::File)
+                it.stringValue(CONFIGURATION_FILE, HV_VES_ENV_PREFIX).map(::File)
             }.toEither {
                 WrongArgumentError(
                         message = "Base configuration filepath missing on command line",
@@ -51,7 +52,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
 
     fun getHealthcheckPort(args: Array<out String>): Int =
             parse(args) {
-                it.intValue(HEALTH_CHECK_API_PORT)
+                it.intValue(HEALTH_CHECK_API_PORT, HV_VES_ENV_PREFIX)
             }.getOrElse {
                 logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" }
                 DEFAULT_HEALTHCHECK_PORT
@@ -76,6 +77,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
                     .let { parser.parse(it, args) }
 
     companion object {
+        private val HV_VES_ENV_PREFIX = EnvPrefix("VESHV")
         private const val DEFAULT_HEALTHCHECK_PORT: Int = 6060
         private val logger = Logger(HvVesCommandLineParser::class)
     }
index 4d65e91..be7b5cc 100644 (file)
@@ -24,17 +24,35 @@ import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
 import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.commandline.hasOption
 import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.commandline.stringValue
 import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
 import java.net.InetSocketAddress
 
 internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
-    override val cmdLineOptionsList: List<CommandLineOption> = listOf(LISTEN_PORT)
+    override val cmdLineOptionsList: List<CommandLineOption> = listOf(
+            LISTEN_PORT,
+            KAFKA_TOPICS,
+            KAFKA_SERVERS,
+            DISABLE_PROCESSING
+    )
 
     override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
             binding {
                 val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
-                KafkaConsumerConfiguration(InetSocketAddress(listenPort))
+                val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS)
+                        .map { it.split(',').toSet() }
+                        .bind()
+                val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+                val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING)
+
+                KafkaConsumerConfiguration(
+                        InetSocketAddress(listenPort),
+                        kafkaTopics,
+                        kafkaBootstrapServers,
+                        disableProcessing
+                )
             }
 }
index ef06a0d..cdd4c30 100644 (file)
@@ -21,4 +21,9 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.config
 
 import java.net.InetSocketAddress
 
-internal data class KafkaConsumerConfiguration(val apiAddress: InetSocketAddress)
+internal data class KafkaConsumerConfiguration(
+        val apiAddress: InetSocketAddress,
+        val kafkaTopics: Set<String>,
+        val kafkaBootstrapServers: String,
+        val disableProcessing: Boolean
+)