Remove environment variables and program arguments 57/82157/10
authorJakub Dudycz <jakub.dudycz@nokia.com>
Wed, 13 Mar 2019 17:44:31 +0000 (18:44 +0100)
committerJakub Dudycz <jakub.dudycz@nokia.com>
Wed, 20 Mar 2019 13:20:03 +0000 (14:20 +0100)
- Move all command line program arguments to json file.
- Reorganize configuration classes and the way they are passed through application
- Implement HV VES configuration stream
- Create concrete configuration from partial one
- Modify main HV-VES server starting pipeline

Change-Id: I6cf874b6904ed768e4820b8132f5f760299c929e
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1340

49 files changed:
development/configuration/configuration.json [new file with mode: 0644]
development/docker-compose.yml
development/logs/.gitignore
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt with 71% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt with 82% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt with 80% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt [deleted file]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt [deleted file]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt [moved from sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt with 66% similarity]
sources/hv-collector-configuration/src/test/resources/sampleConfig.json
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt [deleted file]
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt
sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt
sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt with 72% similarity]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt with 77% similarity]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt

diff --git a/development/configuration/configuration.json b/development/configuration/configuration.json
new file mode 100644 (file)
index 0000000..8e55cf3
--- /dev/null
@@ -0,0 +1,34 @@
+{
+  "logLevel": "DEBUG",
+  "server": {
+    "listenPort": 6061,
+    "idleTimeoutSec": 60,
+    "maxPayloadSizeBytes": 1048576
+  },
+  "cbs": {
+    "firstRequestDelaySec": 10,
+    "requestIntervalSec": 5
+  },
+  "security": {
+    "sslDisable": false,
+    "keys": {
+      "keyStoreFile": "/etc/ves-hv/ssl/server.p12",
+      "keyStorePassword": "onaponap",
+      "trustStoreFile": "/etc/ves-hv/ssl/trust.p12",
+      "trustStorePassword": "onaponap"
+    }
+  },
+  "collector": {
+    "dummyMode": false,
+    "maxRequestSizeBytes": 1048576,
+    "kafkaServers": [
+      "message-router-kafka:9092"
+    ],
+    "routing": [
+      {
+        "fromDomain": "perf3gpp",
+        "toTopic": "HV_VES_PERF3GPP"
+      }
+    ]
+  }
+}
\ No newline at end of file
index abd55ab..85500cb 100644 (file)
@@ -73,15 +73,9 @@ services:
     ports:
       - "6060:6060"
       - "6061:6061/tcp"
-    command: ["--listen-port", "6061",
-              "--health-check-api-port", "6060",
-              "--kafka-bootstrap-servers", "message-router-kafka:9092",
-              "--key-store-password", "onaponap",
-              "--trust-store-password", "onaponap",
-              "--first-request-delay", "5",
-              "--log-level", "DEBUG"]
+    command: ["--configuration-file /etc/ves-hv/configuration/configuration.json"]
     environment:
-      JAVA_OPTS:  "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
+      JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
       CONSUL_HOST: "consul-server"
       CONFIG_BINDING_SERVICE: "cbs"
       HOSTNAME: "dcae-hv-ves-collector"
@@ -95,7 +89,8 @@ services:
       - message-router-kafka
       - config-binding-service
     volumes:
-      - ./ssl/:/etc/ves-hv/
+      - ./configuration/:/etc/ves-hv/configuration/
+      - ./ssl/:/etc/ves-hv/ssl/
       - ./logs:/var/log/ONAP/dcae-hv-ves-collector
 
 
@@ -139,6 +134,7 @@ services:
   #
   # Monitoring
   #
+
   prometheus:
     image: prom/prometheus
     ports:
index 1e45c92..93d42f9 100644 (file)
@@ -33,25 +33,24 @@ import java.nio.file.Paths
 abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) {
     abstract val cmdLineOptionsList: List<CommandLineOption>
 
-    fun parse(args: Array<out String>): Either<WrongArgumentError, T> {
-        val parseResult = Try {
-            val commandLineOptions = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption)
-            parser.parse(commandLineOptions, args)
-        }
-        return parseResult
-                .toEither()
-                .mapLeft { ex -> WrongArgumentError(ex, cmdLineOptionsList) }
-                .map(this::getConfiguration)
-                .flatMap {
-                    it.toEither {
-                        WrongArgumentError(
-                                message = "Unexpected error when parsing command line arguments",
-                                cmdLineOptionsList = cmdLineOptionsList)
-                    }
-                }
-    }
-
     protected abstract fun getConfiguration(cmdLine: CommandLine): Option<T>
 
-    protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI())
+    fun parse(args: Array<out String>): Either<WrongArgumentError, T> =
+            Try { parseArgumentsArray(args) }
+                    .toEither()
+                    .mapLeft { WrongArgumentError(it, cmdLineOptionsList) }
+                    .map(this::getConfiguration)
+                    .flatMap {
+                        it.toEither {
+                            WrongArgumentError(
+                                    message = "Unexpected error when parsing command line arguments",
+                                    cmdLineOptionsList = cmdLineOptionsList)
+                        }
+                    }
+
+    private fun parseArgumentsArray(args: Array<out String>) =
+            cmdLineOptionsList
+                    .map { it.option }
+                    .fold(Options(), Options::addOption)
+                    .let { parser.parse(it, args) }
 }
index 3184921..1c1a355 100644 (file)
@@ -24,78 +24,72 @@ import org.apache.commons.cli.Option
 
 enum class CommandLineOption(val option: Option, val required: Boolean = false) {
     HEALTH_CHECK_API_PORT(
-        Option.builder("H")
-            .longOpt("health-check-api-port")
-            .hasArg()
-            .desc("Health check rest api listen port")
-            .build()
-    ),
-    LISTEN_PORT(
-        Option.builder("p")
-            .longOpt("listen-port")
-            .hasArg()
-            .desc("Listen port")
-            .build(),
-        required = true
+            Option.builder("H")
+                    .longOpt("health-check-api-port")
+                    .hasArg()
+                    .desc("Health check rest api listen port")
+                    .build()
     ),
-    CONFIGURATION_FIRST_REQUEST_DELAY(
-        Option.builder("d")
-            .longOpt("first-request-delay")
-            .hasArg()
-            .desc("Delay of first request for configuration in seconds")
-            .build()
+    CONFIGURATION_FILE(
+            Option.builder("c")
+                    .longOpt("configuration-file")
+                    .hasArg()
+                    .desc("Json file containing HV-VES configuration")
+                    .build(),
+            required = true
     ),
-    CONFIGURATION_REQUEST_INTERVAL(
-        Option.builder("I")
-            .longOpt("request-interval")
-            .hasArg()
-            .desc("Interval of configuration requests in seconds")
-            .build()
+    LISTEN_PORT(
+            Option.builder("p")
+                    .longOpt("listen-port")
+                    .hasArg()
+                    .desc("Listen port")
+                    .build(),
+            required = true
     ),
     VES_HV_PORT(
-        Option.builder("v")
-            .longOpt("ves-port")
-            .hasArg()
-            .desc("VesHvCollector port")
-            .build(),
-        required = true
+            Option.builder("v")
+                    .longOpt("ves-port")
+                    .hasArg()
+                    .desc("VesHvCollector port")
+                    .build(),
+            required = true
     ),
     VES_HV_HOST(
-        Option.builder("h")
-            .longOpt("ves-host")
-            .hasArg()
-            .desc("VesHvCollector host")
-            .build(),
-        required = true
+            Option.builder("h")
+                    .longOpt("ves-host")
+                    .hasArg()
+                    .desc("VesHvCollector host")
+                    .build(),
+            required = true
     ),
     KAFKA_SERVERS(
-        Option.builder("s")
-            .longOpt("kafka-bootstrap-servers")
-            .hasArg()
-            .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
-            .build(),
-        required = true
+            Option.builder("s")
+                    .longOpt("kafka-bootstrap-servers")
+                    .hasArg()
+                    .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
+                    .build(),
+            required = true
     ),
     KAFKA_TOPICS(
-        Option.builder("f")
-            .longOpt("kafka-topics")
-            .hasArg()
-            .desc("Comma-separated Kafka topics")
-            .build(),
-        required = true
+            Option.builder("f")
+                    .longOpt("kafka-topics")
+                    .hasArg()
+                    .desc("Comma-separated Kafka topics")
+                    .build(),
+            required = true
     ),
     SSL_DISABLE(
-        Option.builder("l")
-            .longOpt("ssl-disable")
-            .desc("Disable SSL encryption")
-            .build()
+            Option.builder("l")
+                    .longOpt("ssl-disable")
+                    .desc("Disable SSL encryption")
+                    .build()
     ),
     KEY_STORE_FILE(
-        Option.builder("k")
-            .longOpt("key-store")
-            .hasArg()
-            .desc("Key store in PKCS12 format")
-            .build()
+            Option.builder("k")
+                    .longOpt("key-store")
+                    .hasArg()
+                    .desc("Key store in PKCS12 format")
+                    .build()
     ),
     KEY_STORE_PASSWORD(
             Option.builder("kp")
@@ -105,54 +99,31 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false)
                     .build()
     ),
     TRUST_STORE_FILE(
-        Option.builder("t")
-            .longOpt("trust-store")
-            .hasArg()
-            .desc("File with trusted certificate bundle in PKCS12 format")
-            .build()
+            Option.builder("t")
+                    .longOpt("trust-store")
+                    .hasArg()
+                    .desc("File with trusted certificate bundle in PKCS12 format")
+                    .build()
     ),
     TRUST_STORE_PASSWORD(
-        Option.builder("tp")
-            .longOpt("trust-store-password")
-            .hasArg()
-            .desc("Trust store password")
-            .build()
-    ),
-    IDLE_TIMEOUT_SEC(
-        Option.builder("i")
-            .longOpt("idle-timeout-sec")
-            .hasArg()
-            .desc(
-                """Idle timeout for remote hosts. After given time without any data exchange the
-                |connection might be closed.""".trimMargin()
-            )
-            .build()
+            Option.builder("tp")
+                    .longOpt("trust-store-password")
+                    .hasArg()
+                    .desc("Trust store password")
+                    .build()
     ),
     MAXIMUM_PAYLOAD_SIZE_BYTES(
-        Option.builder("m")
-            .longOpt("max-payload-size")
-            .hasArg()
-            .desc("Maximum supported payload size in bytes")
-            .build()
-    ),
-    LOG_LEVEL(
-        Option.builder("ll")
-            .longOpt("log-level")
-            .hasArg()
-            .desc("Log level")
-            .build()
-    ),
-    DUMMY_MODE(
-        Option.builder("u")
-            .longOpt("dummy")
-            .desc("If present will start in dummy mode (dummy external services)")
-            .build()
+            Option.builder("m")
+                    .longOpt("max-payload-size")
+                    .hasArg()
+                    .desc("Maximum supported payload size in bytes")
+                    .build()
     );
 
     fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String =
-        option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
-            "${prefix}_${mainPart}"
-        }
+            option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
+                "${prefix}_${mainPart}"
+            }
 
     companion object {
         private const val DEFAULT_ENV_PREFIX = "VESHV"
index c0fbcde..48cac69 100644 (file)
@@ -32,13 +32,13 @@ import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain
  * @since June 2018
  */
 
+val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried()
+
 fun handleWrongArgumentError(programName: String, err: WrongArgumentError): IO<Unit> = IO {
     err.printMessage()
     err.printHelp(programName)
 }.flatMap { ExitFailure(2).io() }
 
-val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried()
-
 fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
         longValue(cmdLineOpt).getOrElse { default }
 
index 736710f..6614e77 100644 (file)
@@ -25,8 +25,7 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -49,13 +48,13 @@ class CommandLineOptionTest : Spek({
             }
 
             given("sample option without prefix") {
-                val opt = DUMMY_MODE
+                val opt = SSL_DISABLE
 
                 on("calling environmentVariableName") {
                     val result = opt.environmentVariableName()
 
                     it("should return prefixed upper snake cased long option name") {
-                        assertThat(result).isEqualTo("VESHV_DUMMY")
+                        assertThat(result).isEqualTo("VESHV_SSL_DISABLE")
                     }
                 }
             }
index 874cc5b..5b547a2 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.config.api
 
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
+import org.onap.dcae.collectors.veshv.config.impl.ArgHvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
+import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
+import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
 import reactor.core.publisher.Flux
 
 class ConfigurationModule {
-    fun createConfigurationFromCommandLine(args: Array<String>) =
-            ArgVesHvConfiguration().parse(args)
 
-    fun hvVesConfigurationUpdates(): Flux<ServerConfiguration> = Flux.never<ServerConfiguration>()
+    private val cmd = ArgHvVesConfiguration()
+    private val configReader = FileConfigurationReader()
+    private val configValidator = ConfigurationValidator()
+
+    fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
+            Flux.just(cmd.parse(args))
+                    .throwOnLeft { MissingArgumentException(it.message, it.cause) }
+                    .map { configReader.loadConfig(it.reader()) }
+                    .map { configValidator.validate(it) }
+                    .throwOnLeft { ValidationException(it.message) }
 }
@@ -21,22 +21,34 @@ package org.onap.dcae.collectors.veshv.config.api.model
 
 import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import java.net.InetSocketAddress
 import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
+data class HvVesConfiguration(
+        val server: ServerConfiguration,
+        val cbs: CbsConfiguration,
+        val security: SecurityConfiguration,
+        val collector: CollectorConfiguration,
+        val logLevel: LogLevel
+)
+
 data class ServerConfiguration(
-        val serverListenAddress: InetSocketAddress,
-        val kafkaConfiguration: KafkaConfiguration,
-        val configurationProviderParams: ConfigurationProviderParams,
-        val securityConfiguration: SecurityConfiguration,
+        val listenPort: Int,
         val idleTimeout: Duration,
-        val healthCheckApiListenAddress: InetSocketAddress,
-        val maximumPayloadSizeBytes: Int,
-        val logLevel: LogLevel,
-        val dummyMode: Boolean = false
+        val maxPayloadSizeBytes: Int
 )
 
+data class CbsConfiguration(
+        val firstRequestDelay: Duration,
+        val requestInterval: Duration
+)
+
+data class CollectorConfiguration(
+        val maxRequestSizeBytes: Int,
+        val kafkaServers: String,
+        val routing: Routing,
+        val dummyMode: Boolean = false
+)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,8 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.config.api.model
 
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class CollectorConfiguration(val routing: Routing)
+class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause)
+
+class ValidationException(message: String) : RuntimeException(message)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@ data class Routing(val routes: List<Route>) {
             Option.fromNullable(routes.find { it.applies(commonHeader) })
 }
 
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = {0}) {
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) {
 
     fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
 
@@ -40,24 +40,17 @@ data class Route(val domain: String, val targetTopic: String, val partitioning:
 
 
 /*
-Configuration DSL
- */
+HvVesConfiguration DSL
+*/
 
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
-    val conf = RoutingBuilder()
-    conf.init()
-    return conf
-}
+fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init)
 
 class RoutingBuilder {
     private val routes: MutableList<RouteBuilder> = mutableListOf()
 
-    fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
-        val rule = RouteBuilder()
-        rule.init()
-        routes.add(rule)
-        return rule
-    }
+    fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder()
+            .apply(init)
+            .also { routes.add(it) }
 
     fun build() = Routing(routes.map { it.build() }.toList())
 }
@@ -68,18 +61,17 @@ class RouteBuilder {
     private lateinit var targetTopic: String
     private lateinit var partitioning: (CommonEventHeader) -> Int
 
-    fun fromDomain(domain: String) : RouteBuilder = apply {
+    fun fromDomain(domain: String): RouteBuilder = apply {
         this.domain = domain
     }
 
-    fun toTopic(targetTopic: String) : RouteBuilder = apply {
+    fun toTopic(targetTopic: String): RouteBuilder = apply {
         this.targetTopic = targetTopic
     }
 
-    fun withFixedPartitioning(num: Int = 0) : RouteBuilder = apply {
+    fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply {
         partitioning = { num }
     }
 
     fun build() = Route(domain, targetTopic, partitioning)
-
 }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt
new file mode 100644 (file)
index 0000000..9587d5b
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Option
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE
+import org.onap.dcae.collectors.veshv.commandline.stringValue
+import java.io.File
+
+internal class ArgHvVesConfiguration : ArgBasedConfiguration<File>(DefaultParser()) {
+    override val cmdLineOptionsList = listOf(CONFIGURATION_FILE)
+
+    override fun getConfiguration(cmdLine: CommandLine): Option<File> =
+            cmdLine.stringValue(CONFIGURATION_FILE).map(::File)
+
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt
deleted file mode 100644 (file)
index 08346d3..0000000
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl
-
-import arrow.core.Option
-import arrow.core.fix
-import arrow.core.getOrElse
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
-import org.apache.commons.cli.CommandLine
-import org.apache.commons.cli.DefaultParser
-import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_FILE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LOG_LEVEL
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.SSL_DISABLE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_FILE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.commandline.hasOption
-import org.onap.dcae.collectors.veshv.commandline.intValue
-import org.onap.dcae.collectors.veshv.commandline.longValue
-import org.onap.dcae.collectors.veshv.commandline.stringValue
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
-import java.time.Duration
-
-
-internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
-    override val cmdLineOptionsList = listOf(
-            KAFKA_SERVERS,
-            HEALTH_CHECK_API_PORT,
-            LISTEN_PORT,
-            CONFIGURATION_FIRST_REQUEST_DELAY,
-            CONFIGURATION_REQUEST_INTERVAL,
-            SSL_DISABLE,
-            KEY_STORE_FILE,
-            KEY_STORE_PASSWORD,
-            TRUST_STORE_FILE,
-            TRUST_STORE_PASSWORD,
-            IDLE_TIMEOUT_SEC,
-            MAXIMUM_PAYLOAD_SIZE_BYTES,
-            DUMMY_MODE,
-            LOG_LEVEL
-    )
-
-    override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> =
-            Option.monad().binding {
-                val healthCheckApiPort = cmdLine.intValue(
-                        HEALTH_CHECK_API_PORT,
-                        DefaultValues.HEALTH_CHECK_API_PORT
-                )
-                val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
-                val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
-                val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
-                val maxPayloadSizeBytes = cmdLine.intValue(
-                        MAXIMUM_PAYLOAD_SIZE_BYTES,
-                        DefaultValues.MAX_PAYLOAD_SIZE_BYTES
-                )
-                val dummyMode = cmdLine.hasOption(DUMMY_MODE)
-                val security = createSecurityConfiguration(cmdLine)
-                        .doOnFailure { ex ->
-                            logger.withError { log("Could not read security keys", ex) }
-                        }
-                        .toOption()
-                        .bind()
-                val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL)
-                val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
-                ServerConfiguration(
-                        serverListenAddress = InetSocketAddress(listenPort),
-                        kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes),
-                        healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
-                        configurationProviderParams = configurationProviderParams,
-                        securityConfiguration = security,
-                        idleTimeout = Duration.ofSeconds(idleTimeoutSec),
-                        maximumPayloadSizeBytes = maxPayloadSizeBytes,
-                        dummyMode = dummyMode,
-                        logLevel = determineLogLevel(logLevel)
-                )
-            }.fix()
-
-    private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
-            Option.monad().binding {
-                val firstRequestDelay = cmdLine.longValue(
-                        CONFIGURATION_FIRST_REQUEST_DELAY,
-                        DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY
-                )
-                val requestInterval = cmdLine.longValue(
-                        CONFIGURATION_REQUEST_INTERVAL,
-                        DefaultValues.CONFIGURATION_REQUEST_INTERVAL
-                )
-                ConfigurationProviderParams(
-                        Duration.ofSeconds(firstRequestDelay),
-                        Duration.ofSeconds(requestInterval)
-                )
-            }.fix()
-
-    private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel)
-            .getOrElse {
-                logger.warn {
-                    "Failed to parse $logLevel as $LOG_LEVEL command line. " +
-                            "Using default log level (${DefaultValues.LOG_LEVEL})"
-                }
-                LogLevel.valueOf(DefaultValues.LOG_LEVEL)
-            }
-
-
-    internal object DefaultValues {
-        const val HEALTH_CHECK_API_PORT = 6060
-        const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L
-        const val CONFIGURATION_REQUEST_INTERVAL = 5L
-        const val IDLE_TIMEOUT_SEC = 60L
-        const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
-        val LOG_LEVEL = LogLevel.INFO.name
-    }
-
-    companion object {
-        private val logger = Logger(ArgVesHvConfiguration::class)
-    }
-}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
new file mode 100644 (file)
index 0000000..6c74c33
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Either
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
+import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.net.InetSocketAddress
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since March 2019
+ */
+internal class ConfigurationValidator {
+
+    fun validate(partialConfig: PartialConfiguration)
+            : Either<ValidationError, HvVesConfiguration> = binding {
+        val logLevel = determineLogLevel(partialConfig.logLevel)
+
+        val serverConfiguration = partialConfig.server.bind()
+                .let { createServerConfiguration(it).bind() }
+
+        val cbsConfiguration = partialConfig.cbs.bind()
+                .let { createCbsConfiguration(it).bind() }
+
+        val securityConfiguration = partialConfig.security.bind()
+                .let { createSecurityConfiguration(it).bind() }
+
+        val collectorConfiguration = partialConfig.collector.bind()
+                .let { createCollectorConfig(it).bind() }
+
+        HvVesConfiguration(
+                serverConfiguration,
+                cbsConfiguration,
+                securityConfiguration,
+                collectorConfiguration,
+                logLevel
+        )
+    }.toEither { ValidationError("Some required configuration options are missing") }
+
+    private fun determineLogLevel(logLevel: Option<LogLevel>) =
+            logLevel.getOrElse {
+                logger.warn {
+                    "Missing or invalid \"logLevel\" field. " +
+                            "Using default log level ($DEFAULT_LOG_LEVEL)"
+                }
+                DEFAULT_LOG_LEVEL
+            }
+
+    private fun createServerConfiguration(partial: PartialServerConfig) =
+            partial.mapBinding {
+                ServerConfiguration(
+                        it.listenPort.bind(),
+                        Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()),
+                        it.maxPayloadSizeBytes.bind()
+                )
+            }
+
+    private fun createCbsConfiguration(partial: PartialCbsConfig) =
+            partial.mapBinding {
+                CbsConfiguration(
+                        Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()),
+                        Duration.ofSeconds(it.requestIntervalSec.bind().toLong())
+                )
+            }
+
+    private fun createSecurityConfiguration(partial: PartialSecurityConfig) =
+            partial.keys.map { SecurityConfiguration(Some(it)) }
+
+    private fun createCollectorConfig(partial: PartialCollectorConfig) =
+            partial.mapBinding {
+                CollectorConfiguration(
+                        it.maxRequestSizeBytes.bind(),
+                        toKafkaServersString(it.kafkaServers.bind()),
+                        it.routing.bind(),
+                        it.dummyMode.bind()
+                )
+            }
+
+    private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String =
+            kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" }
+
+    companion object {
+        val DEFAULT_LOG_LEVEL = LogLevel.INFO
+        private val logger = Logger(ConfigurationValidator::class)
+    }
+}
+
+data class ValidationError(val message: String, val cause: Option<Throwable> = None)
index 8d9cca7..3e6df3e 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.config.impl
 
+import arrow.core.None
 import arrow.core.Option
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
@@ -30,31 +31,29 @@ import java.net.InetSocketAddress
  * @since February 2019
  */
 internal data class PartialConfiguration(
-        val server : Option<PartialServerConfig>,
-        val cbs : Option<PartialCbsConfig>,
-        val security : Option<PartialSecurityConfig>,
-        val kafka : Option<PartialKafkaConfig>,
-        val logLevel : Option<LogLevel>
+        val server: Option<PartialServerConfig> = None,
+        val cbs: Option<PartialCbsConfig> = None,
+        val security: Option<PartialSecurityConfig> = None,
+        val collector: Option<PartialCollectorConfig> = None,
+        val logLevel: Option<LogLevel> = None
 )
 
 internal data class PartialServerConfig(
-        val healthCheckApiPort : Option<Int>,
-        val listenPort : Option<Int>,
-        val idleTimeoutSec : Option<Int>,
-        val maximumPayloadSizeBytes : Option<Int>,
-        val dummyMode : Option<Boolean>
+        val listenPort: Option<Int> = None,
+        val idleTimeoutSec: Option<Int> = None,
+        val maxPayloadSizeBytes: Option<Int> = None
 )
 
 internal data class PartialCbsConfig(
-        val firstRequestDelaySec : Option<Int>,
-        val requestIntervalSec : Option<Int>
+        val firstRequestDelaySec: Option<Int> = None,
+        val requestIntervalSec: Option<Int> = None
 )
 
-internal data class PartialSecurityConfig(
-        val sslDisable : Option<Boolean>,
-        val keys : Option<SecurityKeys>)
+internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
 
-internal data class PartialKafkaConfig(
-    val kafkaServers : Option<Array<InetSocketAddress>>,
-    val routing : Option<Routing>
+internal data class PartialCollectorConfig(
+        val dummyMode: Option<Boolean> = None,
+        val maxRequestSizeBytes: Option<Int> = None,
+        val kafkaServers: Option<List<InetSocketAddress>> = None,
+        val routing: Option<Routing> = None
 )
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt
deleted file mode 100644 (file)
index 0f8d8d0..0000000
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.api
-
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
-import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
-import java.time.Duration
-import kotlin.test.assertNotNull
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object ArgVesHvConfigurationTest : Spek({
-    lateinit var cut: ArgVesHvConfiguration
-    val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
-    val healthCheckApiPort = "6070"
-    val firstRequestDelay = "10"
-    val requestInterval = "5"
-    val listenPort = "6969"
-    val keyStorePassword = "kspass"
-    val trustStorePassword = "tspass"
-    val logLevel = LogLevel.DEBUG.name
-
-    beforeEachTest {
-        cut = ArgVesHvConfiguration()
-    }
-
-    describe("parsing arguments") {
-        given("all parameters are present in the long form") {
-            lateinit var result: ServerConfiguration
-
-            beforeEachTest {
-                result = cut.parseExpectingSuccess(
-                        "--kafka-bootstrap-servers", kafkaBootstrapServers,
-                        "--health-check-api-port", healthCheckApiPort,
-                        "--listen-port", listenPort,
-                        "--first-request-delay", firstRequestDelay,
-                        "--request-interval", requestInterval,
-                        "--key-store", "/tmp/keys.p12",
-                        "--trust-store", "/tmp/trust.p12",
-                        "--key-store-password", keyStorePassword,
-                        "--trust-store-password", trustStorePassword,
-                        "--log-level", logLevel
-                )
-            }
-
-            it("should set proper kafka bootstrap servers") {
-                assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers)
-            }
-
-            it("should set proper listen port") {
-                assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
-            }
-
-
-            it("should set default listen address") {
-                assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
-            }
-
-            it("should set proper health check api port") {
-                assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt())
-            }
-
-            it("should set default health check api address") {
-                assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
-            }
-
-            it("should set proper first request delay") {
-                assertThat(result.configurationProviderParams.firstRequestDelay)
-                        .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
-            }
-
-            it("should set proper request interval") {
-                assertThat(result.configurationProviderParams.requestInterval)
-                        .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
-            }
-
-            it("should set proper security configuration") {
-                assertThat(result.securityConfiguration.keys.isEmpty()).isFalse()
-
-                val keys = result.securityConfiguration.keys.orNull() as SecurityKeys
-                assertNotNull(keys.keyStore())
-                assertNotNull(keys.trustStore())
-                keys.keyStorePassword().useChecked {
-                    assertThat(it).isEqualTo(keyStorePassword.toCharArray())
-
-                }
-                keys.trustStorePassword().useChecked {
-                    assertThat(it).isEqualTo(trustStorePassword.toCharArray())
-                }
-            }
-
-            it("should set proper log level") {
-                assertThat(result.logLevel).isEqualTo(LogLevel.DEBUG)
-            }
-        }
-
-        describe("required parameter is absent") {
-            on("missing listen port") {
-                it("should throw exception") {
-                    assertThat(
-                            cut.parseExpectingFailure(
-                                    "--ssl-disable",
-                                    "--first-request-delay", firstRequestDelay,
-                                    "--request-interval", requestInterval
-                            )
-                    ).isInstanceOf(WrongArgumentError::class.java)
-                }
-            }
-            on("missing configuration url") {
-                it("should throw exception") {
-                    assertThat(
-                            cut.parseExpectingFailure(
-                                    "--listen-port", listenPort,
-                                    "--ssl-disable",
-                                    "--first-request-delay", firstRequestDelay,
-                                    "--request-interval", requestInterval
-                            )
-                    ).isInstanceOf(WrongArgumentError::class.java)
-                }
-            }
-        }
-
-        describe("correct log level not provided") {
-            on("missing log level") {
-                it("should set default INFO value") {
-                    val config = cut.parseExpectingSuccess(
-                            "--kafka-bootstrap-servers", kafkaBootstrapServers,
-                            "--health-check-api-port", healthCheckApiPort,
-                            "--listen-port", listenPort,
-                            "--first-request-delay", firstRequestDelay,
-                            "--request-interval", requestInterval,
-                            "--key-store", "/tmp/keys.p12",
-                            "--trust-store", "/tmp/trust.p12",
-                            "--key-store-password", keyStorePassword,
-                            "--trust-store-password", trustStorePassword
-                    )
-
-                    assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
-                }
-            }
-
-            on("incorrect log level") {
-                it("should set default INFO value") {
-                    val config = cut.parseExpectingSuccess(
-                            "--kafka-bootstrap-servers", kafkaBootstrapServers,
-                            "--health-check-api-port", healthCheckApiPort,
-                            "--listen-port", listenPort,
-                            "--first-request-delay", firstRequestDelay,
-                            "--request-interval", requestInterval,
-                            "--key-store", "/tmp/keys.p12",
-                            "--trust-store", "/tmp/trust.p12",
-                            "--key-store-password", keyStorePassword,
-                            "--trust-store-password", trustStorePassword,
-                            "--log-level", "1"
-                    )
-
-                    assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
-                }
-            }
-        }
-    }
-})
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt
new file mode 100644 (file)
index 0000000..dbe757c
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
+import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
+import java.io.File
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ArgVesHvConfigurationTest : Spek({
+    lateinit var cut: ArgHvVesConfiguration
+    val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json")
+
+    beforeEachTest {
+        cut = ArgHvVesConfiguration()
+    }
+
+    describe("parsing arguments") {
+        given("all parameters are present in the long form") {
+            lateinit var result: File
+
+            beforeEachTest {
+                result = cut.parseExpectingSuccess(
+                        "--configuration-file", configFilePath
+                )
+            }
+
+            it("should read proper configuration file") {
+                assertThat(result.exists()).isTrue()
+            }
+        }
+
+        describe("required parameter is absent") {
+            on("missing configuration file path") {
+                it("should throw exception") {
+                    assertThat(
+                            cut.parseExpectingFailure(
+                                    "--non-existing-option", ""
+                            )
+                    ).isInstanceOf(WrongArgumentError::class.java)
+                }
+            }
+        }
+    }
+})
\ No newline at end of file
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
new file mode 100644 (file)
index 0000000..12396d2
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
+import java.time.Duration
+
+internal object ConfigurationValidatorTest : Spek({
+    describe("ConfigurationValidator") {
+        val cut = ConfigurationValidator()
+
+        describe("validating partial configuration with missing fields") {
+            val config = PartialConfiguration(
+                    Some(PartialServerConfig(listenPort = Some(1)))
+            )
+
+            it("should return ValidationError") {
+                val result = cut.validate(config)
+                assertThat(result.isLeft()).isTrue()
+            }
+        }
+
+        describe("validating configuration with empty log level") {
+            val config = PartialConfiguration(
+                    Some(PartialServerConfig(
+                            Some(1),
+                            Some(2),
+                            Some(3)
+                    )),
+                    Some(PartialCbsConfig(
+                            Some(5),
+                            Some(3)
+                    )),
+                    Some(PartialSecurityConfig(
+                            Some(mock())
+                    )),
+                    Some(PartialCollectorConfig(
+                            Some(true),
+                            Some(4),
+                            Some(emptyList()),
+                            Some(routing { }.build())
+                    )),
+                    None
+            )
+
+            it("should use default log level") {
+                val result = cut.validate(config)
+                result.fold(
+                        {
+                            fail("Configuration should have been created successfully")
+                        },
+                        {
+                            assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
+                        }
+                )
+            }
+        }
+
+        describe("validating complete configuration") {
+            val idleTimeoutSec = 10
+            val firstReqDelaySec = 10
+            val securityKeys = Some(mock<SecurityKeys>())
+            val routing = routing { }.build()
+
+            val config = PartialConfiguration(
+                    Some(PartialServerConfig(
+                            Some(1),
+                            Some(idleTimeoutSec),
+                            Some(2)
+                    )),
+                    Some(PartialCbsConfig(
+                            Some(firstReqDelaySec),
+                            Some(3)
+                    )),
+                    Some(PartialSecurityConfig(
+                            securityKeys
+                    )),
+                    Some(PartialCollectorConfig(
+                            Some(true),
+                            Some(4),
+                            Some(emptyList()),
+                            Some(routing)
+                    )),
+                    Some(LogLevel.INFO)
+            )
+
+            it("should create valid configuration") {
+                val result = cut.validate(config)
+                result.fold(
+                        {
+                            fail("Configuration should have been created successfully")
+                        },
+                        {
+                            assertThat(it.server.idleTimeout)
+                                    .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+
+                            assertThat(it.security.keys)
+                                    .isEqualTo(securityKeys)
+
+                            assertThat(it.cbs.firstRequestDelay)
+                                    .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+
+                            assertThat(it.collector.routing)
+                                    .isEqualTo(routing)
+                        }
+                )
+            }
+        }
+    }
+})
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.config.api
+package org.onap.dcae.collectors.veshv.config.impl
 
 import arrow.core.Some
-import org.jetbrains.spek.api.Spek
 import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
-import org.onap.dcae.collectors.veshv.config.impl.PartialCbsConfig
-import org.onap.dcae.collectors.veshv.config.impl.PartialKafkaConfig
-import org.onap.dcae.collectors.veshv.config.impl.PartialSecurityConfig
-import org.onap.dcae.collectors.veshv.config.impl.PartialServerConfig
+import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import java.io.InputStreamReader
 import java.io.StringReader
 import java.net.InetSocketAddress
 
@@ -41,12 +36,13 @@ import java.net.InetSocketAddress
  */
 internal object FileConfigurationReaderTest : Spek({
     describe("A configuration loader utility") {
+        val cut = FileConfigurationReader()
 
         describe("partial configuration loading") {
             it("parses enumerations") {
                 val input = """{"logLevel":"ERROR"}"""
 
-                val config = FileConfigurationReader().loadConfig(StringReader(input))
+                val config = cut.loadConfig(StringReader(input))
                 assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
             }
 
@@ -58,14 +54,13 @@ internal object FileConfigurationReaderTest : Spek({
                 }
             }
             """.trimIndent()
-                val config = FileConfigurationReader().loadConfig(StringReader(input))
+                val config = cut.loadConfig(StringReader(input))
                 assertThat(config.server.nonEmpty()).isTrue()
-                assertThat(config.server.orNull()?.healthCheckApiPort).isEqualTo(Some(12002))
                 assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003))
             }
 
             it("parses ip address") {
-                val input = """{  "kafka" : {
+                val input = """{  "collector" : {
                     "kafkaServers": [
                       "192.168.255.1:5005",
                       "192.168.255.26:5006"
@@ -73,13 +68,13 @@ internal object FileConfigurationReaderTest : Spek({
                   }
                 }"""
 
-                val config = FileConfigurationReader().loadConfig(StringReader(input))
-                assertThat(config.kafka.nonEmpty()).isTrue()
-                val kafka = config.kafka.orNull() as PartialKafkaConfig
-                assertThat(kafka.kafkaServers.nonEmpty()).isTrue()
-                val addresses = kafka.kafkaServers.orNull() as Array<InetSocketAddress>
+                val config = cut.loadConfig(StringReader(input))
+                assertThat(config.collector.nonEmpty()).isTrue()
+                val collector = config.collector.orNull() as PartialCollectorConfig
+                assertThat(collector.kafkaServers.nonEmpty()).isTrue()
+                val addresses = collector.kafkaServers.orNull() as List<InetSocketAddress>
                 assertThat(addresses)
-                        .isEqualTo(arrayOf(
+                        .isEqualTo(listOf(
                                 InetSocketAddress("192.168.255.1", 5005),
                                 InetSocketAddress("192.168.255.26", 5006)
                         ))
@@ -87,7 +82,7 @@ internal object FileConfigurationReaderTest : Spek({
 
             it("parses routing array with RoutingAdapter") {
                 val input = """{
-                    "kafka" : {
+                    "collector" : {
                         "routing" : [
                             {
                               "fromDomain": "perf3gpp",
@@ -96,30 +91,38 @@ internal object FileConfigurationReaderTest : Spek({
                         ]
                     }
                 }""".trimIndent()
-                val config = FileConfigurationReader().loadConfig(StringReader(input))
-                assertThat(config.kafka.nonEmpty()).isTrue()
-                val kafka = config.kafka.orNull() as PartialKafkaConfig
-                assertThat(kafka.routing.nonEmpty()).isTrue()
-                val routing = kafka.routing.orNull() as Routing
+                val config = cut.loadConfig(StringReader(input))
+                assertThat(config.collector.nonEmpty()).isTrue()
+                val collector = config.collector.orNull() as PartialCollectorConfig
+                assertThat(collector.routing.nonEmpty()).isTrue()
+                val routing = collector.routing.orNull() as Routing
                 routing.run {
                     assertThat(routes.size).isEqualTo(1)
                     assertThat(routes[0].domain).isEqualTo("perf3gpp")
                     assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP")
                 }
             }
+
+            it("parses invalid log level string to empty option") {
+                val input = """{
+                    "logLevel": something
+                }""".trimMargin()
+                val config = cut.loadConfig(input.reader())
+
+                assertThat(config.logLevel.isEmpty())
+            }
         }
 
         describe("complete file loading") {
             it("loads actual file") {
-                val config = FileConfigurationReader().loadConfig(
-                        InputStreamReader(
-                                FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")))
+                val config = cut.loadConfig(
+                        javaClass.resourceAsStream("/sampleConfig.json"))
+
                 assertThat(config).isNotNull
                 assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
 
                 assertThat(config.security.nonEmpty()).isTrue()
                 val security = config.security.orNull() as PartialSecurityConfig
-                assertThat(security.sslDisable.orNull()).isFalse()
                 assertThat(security.keys.nonEmpty()).isTrue()
 
                 assertThat(config.cbs.nonEmpty()).isTrue()
@@ -127,21 +130,24 @@ internal object FileConfigurationReaderTest : Spek({
                 assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7))
                 assertThat(cbs.requestIntervalSec).isEqualTo(Some(900))
 
-                assertThat(config.kafka.nonEmpty()).isTrue()
-                val kafka = config.kafka.orNull() as PartialKafkaConfig
-                assertThat(kafka.kafkaServers.nonEmpty()).isTrue()
-                assertThat(kafka.routing.nonEmpty()).isTrue()
+                assertThat(config.collector.nonEmpty()).isTrue()
+                val collector = config.collector.orNull() as PartialCollectorConfig
+                collector.run {
+                    assertThat(dummyMode).isEqualTo(Some(false))
+                    assertThat(maxRequestSizeBytes).isEqualTo(Some(512000))
+                    assertThat(kafkaServers.nonEmpty()).isTrue()
+                    assertThat(routing.nonEmpty()).isTrue()
+                }
 
                 assertThat(config.server.nonEmpty()).isTrue()
                 val server = config.server.orNull() as PartialServerConfig
                 server.run {
-                    assertThat(dummyMode).isEqualTo(Some(false))
-                    assertThat(healthCheckApiPort).isEqualTo(Some(5000))
                     assertThat(idleTimeoutSec).isEqualTo(Some(1200))
                     assertThat(listenPort).isEqualTo(Some(6000))
-                    assertThat(maximumPayloadSizeBytes).isEqualTo(Some(512000))
+                    assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000))
                 }
             }
         }
     }
-})
\ No newline at end of file
+})
+
index b64df05..b49085e 100644 (file)
@@ -1,16 +1,16 @@
 {
-  "server" : {
-    "healthCheckApiPort" : 5000,
-    "listenPort" : 6000,
-    "idleTimeoutSec" : 1200,
-    "maximumPayloadSizeBytes" : 512000,
-    "dummyMode" : false
+  "logLevel": "ERROR",
+  "server": {
+    "healthCheckApiPort": 5000,
+    "listenPort": 6000,
+    "idleTimeoutSec": 1200,
+    "maxPayloadSizeBytes": 512000
   },
-  "cbs" : {
+  "cbs": {
     "firstRequestDelaySec": 7,
     "requestIntervalSec": 900
   },
-  "security" : {
+  "security": {
     "sslDisable": false,
     "keys": {
       "keyStoreFile": "test.ks.pkcs12",
@@ -19,7 +19,9 @@
       "trustStorePassword": "changeMeToo"
     }
   },
-  "kafka" : {
+  "collector": {
+    "dummyMode": false,
+    "maxRequestSizeBytes": 512000,
     "kafkaServers": [
       "192.168.255.1:5005",
       "192.168.255.1:5006"
@@ -30,6 +32,5 @@
         "toTopic": "HV_VES_PERF3GPP"
       }
     ]
-  },
-  "logLevel" : "ERROR"
+  }
 }
\ No newline at end of file
index 8431080..782d232 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.boundary
 
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.utils.Closeable
 import reactor.core.publisher.Flux
 
@@ -48,5 +48,5 @@ interface SinkProvider : Closeable {
 }
 
 interface ConfigurationProvider {
-    operator fun invoke(): Flux<CollectorConfiguration>
+    operator fun invoke(): Flux<Routing>
 }
index 3ea1438..c08df74 100644 (file)
 package org.onap.dcae.collectors.veshv.factory
 
 import arrow.core.Option
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -44,14 +43,14 @@ import java.util.concurrent.atomic.AtomicReference
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class CollectorFactory(val configuration: ConfigurationProvider,
+class CollectorFactory(private val configuration: ConfigurationProvider,
                        private val sinkProvider: SinkProvider,
                        private val metrics: Metrics,
-                       private val maximumPayloadSizeBytes: Int,
+                       private val maxPayloadSizeBytes: Int,
                        private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val config: AtomicReference<CollectorConfiguration> = AtomicReference()
+        val config = AtomicReference<Routing>()
         configuration()
                 .doOnNext {
                     logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -72,12 +71,12 @@ class CollectorFactory(val configuration: ConfigurationProvider,
         }
     }
 
-    private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+    private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
             VesHvCollector(
                     clientContext = ctx,
-                    wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+                    wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
                     protobufDecoder = VesDecoder(),
-                    router = Router(config.routing, ctx),
+                    router = Router(routing, ctx),
                     sink = sinkProvider(ctx),
                     metrics = metrics)
 
index 58a8599..6c4e467 100644 (file)
@@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
 
 /**
@@ -31,8 +32,17 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
  * @since May 2018
  */
 object ServerFactory {
-    fun createNettyTcpServer(serverConfiguration: ServerConfiguration,
+
+    private val sslFactory = SslContextFactory()
+
+    fun createNettyTcpServer(serverConfig: ServerConfiguration,
+                             securityConfig: SecurityConfiguration,
                              collectorProvider: CollectorProvider,
-                             metrics: Metrics): Server =
-            NettyTcpServer(serverConfiguration, SslContextFactory(), collectorProvider, metrics)
+                             metrics: Metrics
+    ): Server = NettyTcpServer(
+            serverConfig,
+            sslFactory.createServerContext(securityConfig),
+            collectorProvider,
+            metrics
+    )
 }
index a853839..c362020 100644 (file)
@@ -21,9 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
 
@@ -32,15 +32,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
  * @since May 2018
  */
 object AdapterFactory {
-    fun sinkCreatorFactory(dummyMode: Boolean,
-                           kafkaConfig: KafkaConfiguration): SinkProvider =
-            if (dummyMode)
+    fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
+            if (config.dummyMode)
                 LoggingSinkProvider()
             else
-                KafkaSinkProvider(kafkaConfig)
+                KafkaSinkProvider(config)
 
-    fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+    fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
             ConfigurationProviderImpl(
                     CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
-                    configurationProviderParams)
+                    config)
 }
index 51b6d4f..754a2ef 100644 (file)
@@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import com.google.gson.JsonObject
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
@@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
                                          retrySpec: Retry<Any>
 
 ) : ConfigurationProvider {
-    constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+    constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
             cbsClientMono,
             params.firstRequestDelay,
             params.requestInterval,
@@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
         healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
     }
 
-    override fun invoke(): Flux<CollectorConfiguration> =
+    override fun invoke(): Flux<Routing> =
             cbsClientMono
                     .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
                     .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
                     .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
                     .flatMapMany(::handleUpdates)
 
-    private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+    private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
             .updates(RequestDiagnosticContext.create(),
                     firstRequestDelay,
                     requestInterval)
@@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
             .retryWhen(retry)
 
 
-    private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+    private fun createCollectorConfiguration(configuration: JsonObject): Routing =
             try {
                 val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
-                CollectorConfiguration(
-                        routing {
-                            for (route in routingArray) {
-                                val routeObj = route.asJsonObject
-                                defineRoute {
-                                    fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
-                                    toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
-                                    withFixedPartitioning()
-                                }
-                            }
-                        }.build()
-                )
+                routing {
+                    for (route in routingArray) {
+                        val routeObj = route.asJsonObject
+                        defineRoute {
+                            fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+                            toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+                            withFixedPartitioning()
+                        }
+                    }
+                }.build()
             } catch (e: NullPointerException) {
                 throw ParsingException("Failed to parse configuration", e)
             }
index f52890b..96e45a0 100644 (file)
@@ -29,10 +29,10 @@ import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
 import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
 import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.kafka.sender.KafkaSender
@@ -46,7 +46,7 @@ import java.lang.Integer.max
 internal class KafkaSinkProvider internal constructor(
         private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
 
-    constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+    constructor(config: CollectorConfiguration) : this(constructKafkaSender(config))
 
     override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
 
@@ -60,14 +60,15 @@ internal class KafkaSinkProvider internal constructor(
         private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
         private const val BUFFER_MEMORY_MULTIPLIER = 32
         private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
-        private fun constructKafkaSender(config: KafkaConfiguration) =
+
+        private fun constructKafkaSender(config: CollectorConfiguration) =
                 KafkaSender.create(constructSenderOptions(config))
 
-        private fun constructSenderOptions(config: KafkaConfiguration) =
+        private fun constructSenderOptions(config: CollectorConfiguration) =
                 SenderOptions.create<CommonEventHeader, VesMessage>()
-                        .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
-                        .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config))
-                        .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config))
+                        .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers)
+                        .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes))
+                        .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes))
                         .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
                         .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
                         .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
@@ -75,10 +76,10 @@ internal class KafkaSinkProvider internal constructor(
                         .producerProperty(ACKS_CONFIG, "1")
                         .stopOnError(false)
 
-        private fun maxRequestSize(config: KafkaConfiguration) =
-                (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt()
+        private fun maxRequestSize(maxRequestSizeBytes: Int) =
+                (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt()
 
-        private fun bufferMemory(config: KafkaConfiguration) =
-                max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes)
+        private fun bufferMemory(maxRequestSizeBytes: Int) =
+                max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes)
     }
 }
index 123956a..fab9656 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
+import arrow.core.Option
 import arrow.core.getOrElse
 import arrow.effects.IO
+import io.netty.handler.ssl.SslContext
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
@@ -30,7 +32,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -41,6 +42,7 @@ import reactor.netty.NettyInbound
 import reactor.netty.NettyOutbound
 import reactor.netty.tcp.TcpServer
 import java.net.InetAddress
+import java.net.InetSocketAddress
 import java.time.Duration
 
 
@@ -48,14 +50,14 @@ import java.time.Duration
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
-                              private val sslContextFactory: SslContextFactory,
+internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
+                              private val sslContext: Option<SslContext>,
                               private val collectorProvider: CollectorProvider,
                               private val metrics: Metrics) : Server {
 
     override fun start(): IO<ServerHandle> = IO {
         TcpServer.create()
-                .addressSupplier { serverConfig.serverListenAddress }
+                .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
                 .configureSsl()
                 .handle(this::handleConnection)
                 .doOnUnbound {
@@ -66,11 +68,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
     }
 
     private fun TcpServer.configureSsl() =
-            sslContextFactory
-                    .createServerContext(serverConfig.securityConfiguration)
-                    .map { sslContext ->
+            sslContext
+                    .map { serverContext ->
                         logger.info { "Collector configured with SSL enabled" }
-                        this.secure { b -> b.sslContext(sslContext) }
+                        this.secure { it.sslContext(serverContext) }
                     }.getOrElse {
                         logger.info { "Collector configured with SSL disabled" }
                         this
@@ -125,7 +126,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                              nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
         withConnectionFrom(nettyInbound) { connection ->
             connection
-                    .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+                    .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
                     .logConnectionClosed(clientContext)
         }.run {
             collector.handleConnection(nettyInbound.createDataStream())
index 21aaa12..f830f2c 100644 (file)
@@ -78,7 +78,7 @@ internal object ConfigurationProviderImplTest : Spek({
                     StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
 
-                                val route1 = it.routing.routes[0]
+                                val route1 = it.routes[0]
                                 assertThat(FAULT.domainName)
                                         .describedAs("routed domain 1")
                                         .isEqualTo(route1.domain)
@@ -86,7 +86,7 @@ internal object ConfigurationProviderImplTest : Spek({
                                         .describedAs("target topic 1")
                                         .isEqualTo(route1.targetTopic)
 
-                                val route2 = it.routing.routes[1]
+                                val route2 = it.routes[1]
                                 assertThat(HEARTBEAT.domainName)
                                         .describedAs("routed domain 2")
                                         .isEqualTo(route2.domain)
index 068476a..1e3f2e7 100644 (file)
@@ -28,9 +28,10 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.ves.VesEventOuterClass
 import reactor.kafka.sender.KafkaSender
 
@@ -41,8 +42,12 @@ import reactor.kafka.sender.KafkaSender
 internal object KafkaSinkProviderTest : Spek({
     describe("non functional requirements") {
         given("sample configuration") {
-            val config = KafkaConfiguration("localhost:9090",
-                    1024 * 1024)
+            val config = CollectorConfiguration(
+                    dummyMode = false,
+                    maxRequestSizeBytes = 1024 * 1024,
+                    kafkaServers = "localhost:9090",
+                    routing = routing { }.build())
+
             val cut = KafkaSinkProvider(config)
 
             on("sample clients") {
index aaa3ee3..bd056d4 100644 (file)
@@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
 
     describe("Messages sent metrics") {
         it("should gather info for each topic separately") {
-            val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(PERF3GPP),
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
 
     describe("Messages dropped metrics") {
         it("should gather metrics for invalid messages") {
-            val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
 
             sut.handleConnection(
                     messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
         }
 
         it("should gather metrics for route not found") {
-            val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
         }
 
         it("should gather metrics for sing errors") {
-            val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+            val sut = vesHvWithAlwaysFailingSink(basicRouting)
 
             sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
 
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
         }
 
         it("should gather summed metrics for dropped messages") {
-            val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
index dc5fe60..ece4228 100644 (file)
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
         it("should handle multiple clients in reasonable time") {
             val sink = CountingSink()
             val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            sut.configurationProvider.updateConfiguration(basicRouting)
 
             val numMessages: Long = 300_000
             val runs = 4
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
         it("should disconnect on transmission errors") {
             val sink = CountingSink()
             val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            sut.configurationProvider.updateConfiguration(basicRouting)
 
             val numMessages: Long = 100_000
             val timeout = Duration.ofSeconds(30)
index d97541b..e84e948 100644 (file)
@@ -27,11 +27,18 @@ import io.netty.buffer.UnpooledByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.tests.fakes.*
+import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
+import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import reactor.core.publisher.Flux
 import java.time.Duration
 import java.util.concurrent.atomic.AtomicBoolean
@@ -40,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class Sut(sink: Sink = StoringSink()): AutoCloseable {
+class Sut(sink: Sink = StoringSink()) : AutoCloseable {
     val configurationProvider = FakeConfigurationProvider()
     val healthStateProvider = FakeHealthState()
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
@@ -94,17 +101,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
     collector.handleConnection(Flux.fromArray(packets)).block(timeout)
 }
 
-fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
         Sut(AlwaysSuccessfulSink()).apply {
-            configurationProvider.updateConfiguration(collectorConfiguration)
+            configurationProvider.updateConfiguration(routing)
         }
 
-fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
         Sut(AlwaysFailingSink()).apply {
-            configurationProvider.updateConfiguration(collectorConfiguration)
+            configurationProvider.updateConfiguration(routing)
         }
 
-fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
         Sut(DelayingSink(delay)).apply {
-            configurationProvider.updateConfiguration(collectorConfiguration)
+            configurationProvider.updateConfiguration(routing)
         }
index ed46b11..17f6ce3 100644 (file)
@@ -24,21 +24,24 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.*
-
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
 import reactor.core.publisher.Flux
 import java.time.Duration
 
@@ -149,7 +152,7 @@ object VesHvSpecification : Spek({
         it("should be able to direct 2 messages from different domains to one topic") {
             val (sut, sink) = vesHvWithStoringSink()
 
-            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
 
             val messages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP),
@@ -210,12 +213,12 @@ object VesHvSpecification : Spek({
 
             it("should start routing messages") {
 
-                sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+                sut.configurationProvider.updateConfiguration(emptyRouting)
 
                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messages).isEmpty()
 
-                sut.configurationProvider.updateConfiguration(basicConfiguration)
+                sut.configurationProvider.updateConfiguration(basicRouting)
 
                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messagesAfterUpdate).hasSize(1)
@@ -317,7 +320,7 @@ object VesHvSpecification : Spek({
         given("failed configuration change") {
             val (sut, _) = vesHvWithStoringSink()
             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            sut.configurationProvider.updateConfiguration(basicRouting)
 
             it("should mark the application unhealthy ") {
                 assertThat(sut.healthStateProvider.currentHealth)
@@ -346,6 +349,6 @@ object VesHvSpecification : Spek({
 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
     val sink = StoringSink()
     val sut = Sut(sink)
-    sut.configurationProvider.updateConfiguration(basicConfiguration)
+    sut.configurationProvider.updateConfiguration(basicRouting)
     return Pair(sut, sink)
 }
index c7e12bb..1ad2b0e 100644 (file)
 package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.config.api.model.routing
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import reactor.core.publisher.FluxProcessor
 import reactor.core.publisher.UnicastProcessor
 import reactor.retry.RetryExhaustedException
@@ -35,62 +34,55 @@ const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
 const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
 const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
 
-val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
-        routing = routing {
-            defineRoute {
-                fromDomain(PERF3GPP.domainName)
-                toTopic(PERF3GPP_TOPIC)
-                withFixedPartitioning()
-            }
-        }.build()
-)
-
-val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
-        routing = routing {
-            defineRoute {
-                fromDomain(PERF3GPP.domainName)
-                toTopic(PERF3GPP_TOPIC)
-                withFixedPartitioning()
-            }
-            defineRoute {
-                fromDomain(HEARTBEAT.domainName)
-                toTopic(PERF3GPP_TOPIC)
-                withFixedPartitioning()
-            }
-            defineRoute {
-                fromDomain(MEASUREMENT.domainName)
-                toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
-                withFixedPartitioning()
-            }
-        }.build()
-)
+val basicRouting = routing {
+    defineRoute {
+        fromDomain(PERF3GPP.domainName)
+        toTopic(PERF3GPP_TOPIC)
+        withFixedPartitioning()
+    }
+}.build()
 
 
-val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
-        routing = routing {
-            defineRoute {
-                fromDomain(PERF3GPP.domainName)
-                toTopic(ALTERNATE_PERF3GPP_TOPIC)
-                withFixedPartitioning()
-            }
-        }.build()
-)
+val twoDomainsToOneTopicRouting = routing {
+    defineRoute {
+        fromDomain(PERF3GPP.domainName)
+        toTopic(PERF3GPP_TOPIC)
+        withFixedPartitioning()
+    }
+    defineRoute {
+        fromDomain(HEARTBEAT.domainName)
+        toTopic(PERF3GPP_TOPIC)
+        withFixedPartitioning()
+    }
+    defineRoute {
+        fromDomain(MEASUREMENT.domainName)
+        toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+        withFixedPartitioning()
+    }
+}.build()
+
+
+val configurationWithDifferentRouting = routing {
+    defineRoute {
+        fromDomain(PERF3GPP.domainName)
+        toTopic(ALTERNATE_PERF3GPP_TOPIC)
+        withFixedPartitioning()
+    }
+}.build()
+
 
+val emptyRouting = routing { }.build()
 
-val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
-        routing = routing {
-        }.build()
-)
 
 class FakeConfigurationProvider : ConfigurationProvider {
     private var shouldThrowException = false
-    private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
+    private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
 
-    fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
+    fun updateConfiguration(routing: Routing) =
             if (shouldThrowException) {
                 configStream.onError(RetryExhaustedException("I'm so tired"))
             } else {
-                configStream.onNext(collectorConfiguration)
+                configStream.onNext(routing)
             }
 
 
index dc9be16..f6d1eab 100644 (file)
@@ -20,9 +20,6 @@
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
 
 import arrow.core.Option
-import arrow.core.fix
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
@@ -34,6 +31,7 @@ import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYL
 import org.onap.dcae.collectors.veshv.commandline.intValue
 import org.onap.dcae.collectors.veshv.commandline.stringValue
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
 import java.net.InetSocketAddress
 
 class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
@@ -45,7 +43,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
     )
 
     override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> =
-            Option.monad().binding {
+            binding {
                 val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
                 val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
 
@@ -62,5 +60,5 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
                         maxPayloadSizeBytes,
                         kafkaBootstrapServers,
                         kafkaTopics)
-            }.fix()
+            }
 }
index 39fcae2..c8a3c01 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main
 
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monad.monad
-import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
 import org.onap.dcae.collectors.veshv.main.servers.VesServer
 import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.Closeable
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.neverComplete
 import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.scheduler.Schedulers
+import java.util.concurrent.atomic.AtomicReference
 
-private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv"
-private val logger = Logger("$VESHV_PACKAGE.main")
-private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
 
-fun main(args: Array<String>) =
-        ConfigurationModule()
-                .createConfigurationFromCommandLine(args)
-                .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-                .map(::startAndAwaitServers)
-                .unsafeRunEitherSync(
-                        { ex ->
-                            logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
-                            ExitFailure(1)
-                        },
-                        { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } }
-                )
+private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
+private val logger = Logger("$VES_HV_PACKAGE.main")
 
-private fun startAndAwaitServers(config: ServerConfiguration) =
-        IO.monad().binding {
-            Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
-            logger.info { "Using configuration: $config" }
+private val hvVesServer = AtomicReference<ServerHandle>()
 
-            val healthCheckServerHandle = HealthCheckServer.start(config).bind()
-            val hvVesHandle = VesServer.start(config).bind()
+fun main(args: Array<String>) {
+    HealthCheckServer.start()
+    ConfigurationModule()
+            .hvVesConfigurationUpdates(args)
+            .publishOn(Schedulers.single(Schedulers.elastic()))
+            .doOnNext(::startServer)
+            .doOnError(::logServerStartFailed)
+            .neverComplete() // TODO: remove after merging configuration stream with cbs
+            .block()
+}
+
+private fun startServer(config: HvVesConfiguration) {
+    stopRunningServer()
+    Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
+    logger.info { "Using configuration: $config" }
+
+    VesServer.start(config).let {
+        registerShutdownHook { shutdownGracefully(it) }
+        hvVesServer.set(it)
+    }
+}
 
-            registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle))
-            hvVesHandle.await().bind()
-        }.fix()
+private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
 
-internal fun closeServers(vararg handles: ServerHandle,
-                          healthState: HealthState = HealthState.INSTANCE) = {
+internal fun shutdownGracefully(serverHandle: ServerHandle,
+                                healthState: HealthState = HealthState.INSTANCE) {
     logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
     healthState.changeState(HealthDescription.SHUTTING_DOWN)
-    Closeable.closeAll(handles.asIterable()).unsafeRunSync()
+    serverHandle.close().unsafeRunSync()
     logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
 }
+
+private fun logServerStartFailed(ex: Throwable) =
+        logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
+
index 15472b5..bc284d0 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main.servers
 
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.net.InetSocketAddress
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-object HealthCheckServer : ServerStarter() {
-    override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+object HealthCheckServer {
 
-    private fun createHealthCheckServer(config: ServerConfiguration) =
+    private const val DEFAULT_HEALTHCHECK_PORT = 6060
+    private val logger = Logger(HealthCheckServer::class)
+
+    fun start(port: Int = DEFAULT_HEALTHCHECK_PORT) =
+            createHealthCheckServer(port)
+                    .start()
+                    .then(::logServerStarted)
+                    .unsafeRunSync()
+
+    private fun createHealthCheckServer(listenPort: Int) =
             HealthCheckApiServer(
                     HealthState.INSTANCE,
                     MicrometerMetrics.INSTANCE.metricsProvider,
-                    config.healthCheckApiListenAddress)
+                    InetSocketAddress(listenPort))
 
-    override fun serverStartedMessage(handle: ServerHandle) =
-            "Health check server is up and listening on ${handle.host}:${handle.port}"
+    private fun logServerStarted(handle: ServerHandle) =
+            logger.info(ServiceContext::mdc) {
+                "Health check server is up and listening on ${handle.host}:${handle.port}"
+            }
 }
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
deleted file mode 100644 (file)
index 74a6632..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.main.servers
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-abstract class ServerStarter {
-    fun start(config: ServerConfiguration): IO<ServerHandle> =
-            startServer(config)
-                    .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } }
-
-    protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
-    protected abstract fun serverStartedMessage(handle: ServerHandle): String
-
-    companion object {
-        private val logger = Logger(ServerStarter::class)
-    }
-}
index 0f5e45e..d15dcce 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main.servers
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
 import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-object VesServer : ServerStarter() {
-    override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
-
-    private fun createVesServer(config: ServerConfiguration): Server {
-        val collectorProvider = CollectorFactory(
-                AdapterFactory.configurationProvider(config.configurationProviderParams),
-                AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
-                MicrometerMetrics.INSTANCE,
-                config.maximumPayloadSizeBytes
-        ).createVesHvCollectorProvider()
-
-        return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE)
-    }
-
-    override fun serverStartedMessage(handle: ServerHandle) =
-            "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+object VesServer {
+
+    private val logger = Logger(VesServer::class)
+
+    fun start(config: HvVesConfiguration): ServerHandle =
+            createVesServer(config)
+                    .start()
+                    .then(::logServerStarted)
+                    .unsafeRunSync()
+
+    private fun createVesServer(config: HvVesConfiguration): Server =
+            initializeCollectorFactory(config)
+                    .createVesHvCollectorProvider()
+                    .let { collectorProvider ->
+                        ServerFactory.createNettyTcpServer(
+                                config.server,
+                                config.security,
+                                collectorProvider,
+                                MicrometerMetrics.INSTANCE
+                        )
+                    }
+
+    private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
+            CollectorFactory(
+                    AdapterFactory.configurationProvider(config.cbs),
+                    AdapterFactory.sinkCreatorFactory(config.collector),
+                    MicrometerMetrics.INSTANCE,
+                    config.server.maxPayloadSizeBytes
+            )
+
+    private fun logServerStarted(handle: ServerHandle) =
+            logger.info(ServiceContext::mdc) {
+                "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+            }
+
 }
index e18b0b1..d8de9f2 100644 (file)
@@ -42,7 +42,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
 internal object MainTest : Spek({
     describe("closeServer shutdown hook") {
         given("server handles and health state") {
-            val handle: ServerHandle = mock()
+            val handle = mock<ServerHandle>()
             var closed = false
             val handleClose = IO {
                 closed = true
@@ -50,8 +50,8 @@ internal object MainTest : Spek({
             whenever(handle.close()).thenReturn(handleClose)
             val healthState: HealthState = mock()
 
-            on("closeServers") {
-                closeServers(handle, healthState = healthState).invoke()
+            on("shutdownGracefully") {
+                shutdownGracefully(handle, healthState = healthState)
 
                 it("should close all handles") {
                     assertThat(closed).isTrue()
index fb21111..579eb84 100644 (file)
@@ -21,11 +21,9 @@ package org.onap.dcae.collectors.veshv.ssl.boundary
 
 import arrow.core.Option
 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
-import java.nio.file.Path
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class SecurityConfiguration(
-        val keys: Option<SecurityKeys>)
+data class SecurityConfiguration(val keys: Option<SecurityKeys>)
index 805d94d..f72ddec 100644 (file)
@@ -30,7 +30,7 @@ import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory
 class SslContextFactory(private val sslFactory: SslFactory = SslFactory()) {
     fun createServerContext(secConfig: SecurityConfiguration): Option<SslContext> =
             secConfig.keys.map { sslFactory.createSecureServerContext(it) }
+
     fun createClientContext(secConfig: SecurityConfiguration): Option<SslContext> =
             secConfig.keys.map { sslFactory.createSecureClientContext(it) }
-
 }
index f863235..822d84f 100644 (file)
@@ -51,7 +51,7 @@ fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> Securit
 
 private fun shouldDisableSsl(cmdLine: CommandLine) = cmdLine.hasOption(CommandLineOption.SSL_DISABLE)
 
-private fun disabledSecurityConfiguration() = SecurityConfiguration(keys = None)
+private fun disabledSecurityConfiguration() = SecurityConfiguration(None)
 
 private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
     val ksFile = cmdLine.stringValue(CommandLineOption.KEY_STORE_FILE, KEY_STORE_FILE)
@@ -66,8 +66,7 @@ private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfigur
             .trustStorePassword(Passwords.fromString(tsPass))
             .build()
 
-    return SecurityConfiguration(keys = Some(keys))
+    return SecurityConfiguration(Some(keys))
 }
 
-
 private fun pathFromFile(file: String) = Paths.get(file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.config.api.model
+package org.onap.dcae.collectors.veshv.tests.utils
 
-import java.time.Duration
+import java.io.InputStreamReader
 
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-data class ConfigurationProviderParams(val firstRequestDelay: Duration,
-                                       val requestInterval: Duration)
+fun <T> Class<T>.resourceAsStream(resourcePath: String): InputStreamReader =
+        InputStreamReader(getResourceAsStream(resourcePath))
+
+fun <T> Class<T>.absoluteResourcePath(resourcePath: String): String =
+        getResource(resourcePath).path
index bedc2fc..d5b33b9 100644 (file)
 package org.onap.dcae.collectors.veshv.utils.arrow
 
 import arrow.core.Either
+import arrow.core.ForOption
 import arrow.core.Option
 import arrow.core.Try
+import arrow.core.fix
 import arrow.core.identity
+import arrow.effects.ForIO
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monad.monad
+import arrow.instances.option.monad.monad
 import arrow.syntax.collections.firstOption
+import arrow.typeclasses.MonadContinuation
+import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
 import java.util.concurrent.atomic.AtomicReference
 
 /**
@@ -31,12 +41,24 @@ import java.util.concurrent.atomic.AtomicReference
  * @since July 2018
  */
 
+object OptionUtils {
+    fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A)
+            : Option<A> = Option.monad().binding(c).fix()
+}
+
+object IOUtils {
+    fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A)
+            : IO<A> = IO.monad().binding(c).fix()
+}
+
 fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
 
 fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
 
 fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
 
+fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) }
+
 fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
 
 fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
@@ -57,3 +79,13 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply {
         action(exception)
     }
 }
+
+fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B)
+        : Option<B> = let { OptionUtils.binding { c(it) } }
+
+
+
+
+
+
+
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.config.api.model
+package org.onap.dcae.collectors.veshv.utils
 
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-data class KafkaConfiguration(val bootstrapServers: String, val maximalRequestSizeBytes: Int)
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+fun <T> Flux<T>.neverComplete(): Mono<Void> = then(Mono.never<T>()).then()
\ No newline at end of file
index 87aea41..cc94090 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.utils
 
+import java.util.concurrent.atomic.AtomicReference
+
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since January 2019
  */
-fun registerShutdownHook(job: () -> Unit) =
-        Runtime.getRuntime()
-                .addShutdownHook(Thread({ job() }, "GracefulShutdownThread"))
+
+private val currentShutdownHook = AtomicReference<Thread>()
+
+fun registerShutdownHook(job: () -> Unit) {
+    val runtime = Runtime.getRuntime()
+    val newShutdownHook = Thread({ job() }, "GracefulShutdownThread")
+    currentShutdownHook.get()?.run(runtime::removeShutdownHook)
+    currentShutdownHook.set(newShutdownHook)
+    runtime.addShutdownHook(newShutdownHook)
+}
index 08c05e0..28cc055 100644 (file)
@@ -20,9 +20,6 @@
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
 
 import arrow.core.Option
-import arrow.core.fix
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
@@ -40,6 +37,7 @@ import org.onap.dcae.collectors.veshv.commandline.intValue
 import org.onap.dcae.collectors.veshv.commandline.stringValue
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
 import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.net.InetSocketAddress
@@ -62,7 +60,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
             TRUST_STORE_PASSWORD)
 
     override fun getConfiguration(cmdLine: CommandLine): Option<SimulatorConfiguration> =
-            Option.monad().binding {
+            binding {
                 val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
                 val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
                 val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
@@ -86,7 +84,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
                         InetSocketAddress(vesHost, vesPort),
                         maxPayloadSizeBytes,
                         security)
-            }.fix()
+            }
 
     internal object DefaultValues {
         const val HEALTH_CHECK_API_PORT = 6063
index a73b39b..a1042f3 100644 (file)
@@ -20,9 +20,6 @@
 package org.onap.dcae.collectors.veshv.simulators.xnf
 
 import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monad.monad
-import arrow.typeclasses.binding
 import io.vavr.collection.HashSet
 import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
@@ -36,6 +33,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfigura
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -62,7 +60,7 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         )
 
 private fun startServers(config: SimulatorConfiguration): IO<Unit> =
-        IO.monad().binding {
+        binding {
             logger.info { "Using configuration: $config" }
 
             XnfHealthCheckServer().startServer(config).bind()
@@ -79,5 +77,5 @@ private fun startServers(config: SimulatorConfiguration): IO<Unit> =
             HealthState.INSTANCE.changeState(HealthDescription.IDLE)
 
             xnfApiServerHandler.await().bind()
-        }.fix()
+        }