Extract transforming logic from validator 69/84869/11
authorFilip Krzywka <filip.krzywka@nokia.com>
Wed, 10 Apr 2019 09:36:48 +0000 (11:36 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Fri, 12 Apr 2019 06:20:15 +0000 (08:20 +0200)
Change-Id: Ic019b1796e17d24f14f41a817af6e5ecd8c7244b
Issue-ID: DCAEGEN2-1416
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
17 files changed:
sources/hv-collector-configuration/pom.xml
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt with 59% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt [deleted file]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt [new file with mode: 0644]
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt

index b6ec4ca..eda8b44 100644 (file)
@@ -93,5 +93,9 @@
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-extras-data</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
index f0ee3a4..ded7583 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.config.api
 
-import arrow.core.getOrElse
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
-import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
 import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
 import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
 import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
@@ -41,8 +40,9 @@ class ConfigurationModule {
 
     private val cmd = HvVesCommandLineParser()
     private val configParser = JsonConfigurationParser()
+    private val configMerger = ConfigurationMerger()
     private val configValidator = ConfigurationValidator()
-    private val merger = ConfigurationMerger()
+    private val configTransformer = ConfigurationTransformer()
 
     fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
 
@@ -58,14 +58,15 @@ class ConfigurationModule {
                     .flatMapMany { basePartialConfig ->
                         cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
                                 .invoke()
-                                .map { merger.merge(basePartialConfig, it) }
+                                .map { configMerger.merge(basePartialConfig, it) }
                                 .map(configValidator::validate)
                                 .throwOnLeft()
+                                .map(configTransformer::toFinalConfiguration)
                     }
 
     private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
                                          configStateListener: ConfigurationStateListener,
-                                         mdc: MappedDiagnosticContext): CbsConfigurationProvider =
+                                         mdc: MappedDiagnosticContext) =
             CbsConfigurationProvider(
                     CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
                     cbsConfigurationFrom(basePartialConfig),
@@ -73,11 +74,12 @@ class ConfigurationModule {
                     configStateListener,
                     mdc)
 
-    private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator
-            .validatedCbsConfiguration(basePartialConfig)
-            .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+    private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
+            configValidator.validatedCbsConfiguration(basePartialConfig)
+                    .let { configTransformer.toCbsConfiguration(it) }
 
     companion object {
         private val logger = Logger(ConfigurationModule::class)
     }
+
 }
index 8db2f77..fd3cccd 100644 (file)
@@ -46,13 +46,6 @@ data class CbsConfiguration(
 )
 
 data class CollectorConfiguration(
-        val routing: Routing
-) {
-    val maxPayloadSizeBytes by lazy {
-        routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE
-    }
-
-    companion object {
-        internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024
-    }
-}
+        val routing: Routing,
+        val maxPayloadSizeBytes: Int
+)
index e670782..96fa421 100644 (file)
@@ -29,24 +29,23 @@ import arrow.core.toOption
  * @since March 2019
  */
 internal class ConfigurationMerger {
-    fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration =
-            PartialConfiguration(
-                    listenPort = base.listenPort.updateToGivenOrNone(update.listenPort),
-                    idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
+    fun merge(base: PartialConfiguration, update: PartialConfiguration) = PartialConfiguration(
+            listenPort = base.listenPort.updateToGivenOrNone(update.listenPort),
+            idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
 
-                    firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
-                    requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec),
+            firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
+            requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec),
 
-                    sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable),
-                    keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile),
-                    keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile),
-                    trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile),
-                    trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile),
+            sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable),
+            keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile),
+            keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile),
+            trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile),
+            trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile),
 
-                    streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers),
+            streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers),
 
-                    logLevel = base.logLevel.updateToGivenOrNone(update.logLevel)
-            )
+            logLevel = base.logLevel.updateToGivenOrNone(update.logLevel)
+    )
 
     private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
             update.getOrElse(this::orNull).toOption()
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt
new file mode 100644 (file)
index 0000000..08cce13
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
+import java.nio.file.Paths
+import java.time.Duration
+
+internal class ConfigurationTransformer {
+
+    fun toFinalConfiguration(validatedConfig: ValidatedPartialConfiguration): HvVesConfiguration {
+        val serverConfiguration = toServerConfiguration(validatedConfig)
+
+        val cbsConfiguration = toCbsConfiguration(validatedConfig.cbsConfiguration)
+
+        val securityConfiguration = determineSecurityConfiguration(validatedConfig)
+
+        val collectorConfiguration = toCollectorConfiguration(validatedConfig)
+
+        val logLevel = determineLogLevel(validatedConfig.logLevel)
+
+        return HvVesConfiguration(
+                serverConfiguration,
+                cbsConfiguration,
+                securityConfiguration,
+                collectorConfiguration,
+                logLevel
+        )
+    }
+
+    fun toCbsConfiguration(cbsConfiguration: ValidatedCbsConfiguration) = CbsConfiguration(
+            Duration.ofSeconds(cbsConfiguration.firstRequestDelaySec),
+            Duration.ofSeconds(cbsConfiguration.requestIntervalSec)
+    )
+
+    private fun toServerConfiguration(validatedConfig: ValidatedPartialConfiguration) = ServerConfiguration(
+            validatedConfig.listenPort,
+            Duration.ofSeconds(validatedConfig.idleTimeoutSec)
+    )
+
+    private fun determineSecurityConfiguration(validConfig: ValidatedPartialConfiguration) =
+            validConfig.securityConfiguration.fold({ SecurityConfiguration(None) }, { createSecurityConfiguration(it) })
+
+    private fun toCollectorConfiguration(validatedConfig: ValidatedPartialConfiguration) =
+            validatedConfig.streamPublishers.map { Route(it.name(), it) }
+                    .let { routing ->
+                        CollectorConfiguration(
+                                routing,
+                                determineMaxPayloadSize(routing)
+                        )
+                    }
+
+    private fun createSecurityConfiguration(paths: ValidatedSecurityPaths) = SecurityConfiguration(
+            ImmutableSecurityKeys.builder()
+                    .keyStore(ImmutableSecurityKeysStore.of(Paths.get(paths.keyStoreFile)))
+                    .keyStorePassword(Passwords.fromPath(Paths.get(paths.keyStorePasswordFile)))
+                    .trustStore(ImmutableSecurityKeysStore.of(Paths.get(paths.trustStoreFile)))
+                    .trustStorePassword(Passwords.fromPath(Paths.get(paths.trustStorePasswordFile)))
+                    .build()
+                    .toOption()
+    )
+
+    private fun determineMaxPayloadSize(routing: List<Route>) =
+            routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: useDefaultMaxPayloadSize()
+
+    private fun determineLogLevel(logLevel: Option<LogLevel>) =
+            logLevel.getOrElse(::useDefaultLogLevel)
+
+    private fun useDefaultMaxPayloadSize() = DEFAULT_MAX_PAYLOAD_SIZE.also {
+        logger.warn {
+            "Failed to determine \"maxPayloadSizeBytes\" field from routing. Using default ($it)"
+        }
+    }
+
+    private fun useDefaultLogLevel() = DEFAULT_LOG_LEVEL.also {
+        logger.warn { "Missing or invalid \"logLevel\" field. Using default log level ($it)" }
+    }
+
+    companion object {
+        private val logger = Logger(ConfigurationTransformer::class)
+
+        private val DEFAULT_LOG_LEVEL = LogLevel.INFO
+        private const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024
+    }
+}
index f4ce592..c97c975 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.config.impl
 
-import arrow.core.None
-import arrow.core.Option
-import arrow.core.Some
-import arrow.core.getOrElse
-import arrow.core.toOption
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import arrow.core.Either
+import arrow.core.Left
+import arrow.core.Right
+import arrow.data.Invalid
+import arrow.data.Validated
 import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
-import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.arrow.flatFold
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
-import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
-import java.io.File
-import java.nio.file.Path
-import java.time.Duration
+
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -49,104 +35,62 @@ import java.time.Duration
  */
 internal class ConfigurationValidator {
 
-    fun validate(partialConfig: PartialConfiguration) =
-            logger.info { "About to validate configuration: $partialConfig" }.let {
-                binding {
-                    val logLevel = determineLogLevel(partialConfig.logLevel)
-
-                    val serverConfiguration = validatedServerConfiguration(partialConfig)
-                            .doOnEmpty { logger.debug { "Cannot bind server configuration" } }
-                            .bind()
-
-                    val cbsConfiguration = validatedCbsConfiguration(partialConfig)
-                            .doOnEmpty { logger.debug { "Cannot bind cbs configuration" } }
-                            .bind()
-
-                    val securityConfiguration = determineSecurityConfiguration(partialConfig)
-                            .doOnEmpty { logger.debug { "Cannot bind security configuration" } }
-                            .bind()
-
-                    val collectorConfiguration = validatedCollectorConfig(partialConfig)
-                            .doOnEmpty { logger.debug { "Cannot bind collector configuration" } }
-                            .bind()
-
-                    HvVesConfiguration(
-                            serverConfiguration,
-                            cbsConfiguration,
-                            securityConfiguration,
-                            collectorConfiguration,
-                            logLevel
-                    )
-                }.toEither { ValidationException("Some required configuration options are missing") }
-            }
-
+    fun validate(partial: PartialConfiguration): Either<ValidationException, ValidatedPartialConfiguration> =
+            logger.info { "About to validate configuration: $partial" }.let {
+                val invalidFields = mutableSetOf(
+                        validate(partial::streamPublishers)
+                )
+                        .union(cbsConfigurationValidation(partial))
+                        .union(serverConfigurationValidation(partial))
+                        .union(securityValidation(partial))
+                        .filter { it.isInvalid }
 
-    private fun determineLogLevel(logLevel: Option<LogLevel>) =
-            logLevel.getOrElse {
-                logger.warn {
-                    "Missing or invalid \"logLevel\" field. " +
-                            "Using default log level ($DEFAULT_LOG_LEVEL)"
+                if (invalidFields.isNotEmpty()) {
+                    return Left(ValidationException(validationMessageFrom(invalidFields)))
                 }
-                DEFAULT_LOG_LEVEL
-            }
 
-    private fun validatedServerConfiguration(partial: PartialConfiguration) =
-            partial.mapBinding {
-                ServerConfiguration(
-                        it.listenPort.bind(),
-                        Duration.ofSeconds(it.idleTimeoutSec.bind())
-                )
+                Right(partial.unsafeAsValidated())
             }
 
-    internal fun validatedCbsConfiguration(partial: PartialConfiguration) =
-            partial.mapBinding {
-                CbsConfiguration(
-                        Duration.ofSeconds(it.firstRequestDelaySec.bind()),
-                        Duration.ofSeconds(it.requestIntervalSec.bind())
-                )
-            }
-
-    private fun determineSecurityConfiguration(partial: PartialConfiguration) =
-            partial.sslDisable.fold({ createSecurityConfiguration(partial) }, { sslDisabled ->
-                if (sslDisabled) {
-                    Some(SecurityConfiguration(None))
-                } else {
-                    createSecurityConfiguration(partial)
-                }
+    fun validatedCbsConfiguration(partial: PartialConfiguration) = ValidatedCbsConfiguration(
+            firstRequestDelaySec = getOrThrowValidationException(partial::firstRequestDelaySec),
+            requestIntervalSec = getOrThrowValidationException(partial::requestIntervalSec)
+    )
+
+    private fun cbsConfigurationValidation(partial: PartialConfiguration) = setOf(
+            validate(partial::firstRequestDelaySec),
+            validate(partial::requestIntervalSec)
+    )
+
+    private fun serverConfigurationValidation(partial: PartialConfiguration) = setOf(
+            validate(partial::listenPort),
+            validate(partial::idleTimeoutSec)
+    )
+
+    private fun securityValidation(partial: PartialConfiguration) =
+            partial.sslDisable.flatFold({
+                validatedSecurityConfiguration(partial)
+            }, {
+                setOf(Validated.Valid("sslDisable flag is set to true"))
             })
 
-    private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> =
-            partial.mapBinding {
-                SecurityConfiguration(
-                        createSecurityKeys(
-                                File(it.keyStoreFile.bind()).toPath(),
-                                File(it.keyStorePasswordFile.bind()).toPath(),
-                                File(it.trustStoreFile.bind()).toPath(),
-                                File(it.trustStorePasswordFile.bind()).toPath()
-                        ).toOption()
-                )
-            }
+    private fun validatedSecurityConfiguration(partial: PartialConfiguration) = setOf(
+            validate(partial::keyStoreFile),
+            validate(partial::keyStorePasswordFile),
+            validate(partial::trustStoreFile),
+            validate(partial::trustStorePasswordFile)
+    )
 
-    private fun createSecurityKeys(keyStorePath: Path,
-                                   keyStorePasswordPath: Path,
-                                   trustStorePath: Path,
-                                   trustStorePasswordPath: Path) =
-            ImmutableSecurityKeys.builder()
-                    .keyStore(ImmutableSecurityKeysStore.of(keyStorePath))
-                    .keyStorePassword(Passwords.fromPath(keyStorePasswordPath))
-                    .trustStore(ImmutableSecurityKeysStore.of(trustStorePath))
-                    .trustStorePassword(Passwords.fromPath(trustStorePasswordPath))
-                    .build()
+    private fun <A> validate(property: ConfigProperty<A>) =
+            Validated.fromOption(property.get(), { "- missing property: ${property.name}\n" })
 
-    private fun validatedCollectorConfig(partial: PartialConfiguration) =
-            partial.mapBinding { config ->
-                CollectorConfiguration(
-                        config.streamPublishers.bind().map { Route(it.name(), it) }
-                )
-            }
+    private fun <A> validationMessageFrom(invalidFields: List<Validated<String, A>>): String =
+            invalidFields.map { it as Invalid }
+                    .map { it.e }
+                    .fold("", String::plus)
+                    .let { "Some required configuration properties are missing: \n$it" }
 
     companion object {
-        val DEFAULT_LOG_LEVEL = LogLevel.INFO
         private val logger = Logger(ConfigurationValidator::class)
     }
 }
@@ -21,9 +21,16 @@ package org.onap.dcae.collectors.veshv.config.impl
 
 import arrow.core.None
 import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
 import com.google.gson.annotations.SerializedName
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
+import org.onap.dcae.collectors.veshv.utils.arrow.flatFold
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import kotlin.reflect.KProperty0
+
+internal typealias ConfigProperty<A> = KProperty0<Option<A>>
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -56,4 +63,29 @@ internal data class PartialConfiguration(
 
         @Transient
         var streamPublishers: Option<List<KafkaSink>> = None
-)
+) {
+    fun unsafeAsValidated() = ValidatedPartialConfiguration(
+            listenPort = getOrThrowValidationException(::listenPort),
+            idleTimeoutSec = getOrThrowValidationException(::idleTimeoutSec),
+            cbsConfiguration = ValidatedCbsConfiguration(
+                    firstRequestDelaySec = getOrThrowValidationException(::firstRequestDelaySec),
+                    requestIntervalSec = getOrThrowValidationException(::requestIntervalSec)
+            ),
+            streamPublishers = getOrThrowValidationException(::streamPublishers),
+            securityConfiguration = sslDisable.flatFold({ forceValidatedSecurityPaths() }, { None }),
+            logLevel = logLevel
+    )
+
+    private fun forceValidatedSecurityPaths() =
+            Some(ValidatedSecurityPaths(
+                    keyStoreFile = getOrThrowValidationException(::keyStoreFile),
+                    keyStorePasswordFile = getOrThrowValidationException(::keyStorePasswordFile),
+                    trustStoreFile = getOrThrowValidationException(::trustStoreFile),
+                    trustStorePasswordFile = getOrThrowValidationException(::trustStorePasswordFile)
+            ))
+}
+
+internal fun <A> getOrThrowValidationException(property: ConfigProperty<A>) =
+        property().getOrElse {
+            throw ValidationException("Field `${property.name}` was not validated and is missing in configuration")
+        }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt
new file mode 100644 (file)
index 0000000..a230bfc
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Option
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+
+
+internal data class ValidatedPartialConfiguration(
+        val listenPort: Int,
+        val idleTimeoutSec: Long,
+        val cbsConfiguration: ValidatedCbsConfiguration,
+        val securityConfiguration: Option<ValidatedSecurityPaths>,
+        val logLevel: Option<LogLevel>,
+        val streamPublishers: List<KafkaSink>
+)
+
+internal data class ValidatedCbsConfiguration(
+        val firstRequestDelaySec: Long,
+        val requestIntervalSec: Long
+)
+
+internal data class ValidatedSecurityPaths(
+        val keyStoreFile: String,
+        val keyStorePasswordFile: String,
+        val trustStoreFile: String,
+        val trustStorePasswordFile: String
+)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt
deleted file mode 100644 (file)
index dbdf4ad..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl
-
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal object CollectorConfigurationTest : Spek({
-
-    describe("CollectorConfiguration") {
-        describe("calculating maxPayloadSizeBytes") {
-            on("defined routes") {
-                val sampleRouting = listOf(
-                        Route(sink1.name(), sink1),
-                        Route(sink2.name(), sink2),
-                        Route(sink3.name(), sink3)
-                )
-                val configuration = CollectorConfiguration(sampleRouting)
-
-                it("should use the highest value among all routes") {
-                    assertThat(configuration.maxPayloadSizeBytes)
-                            .isEqualTo(highestMaxPayloadSize)
-                }
-            }
-
-            on("empty routing") {
-                val configuration = CollectorConfiguration(emptyList())
-
-                it("should use default value") {
-                    assertThat(configuration.maxPayloadSizeBytes)
-                            .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE)
-                }
-            }
-        }
-    }
-})
-
-private const val highestMaxPayloadSize = 3
-
-private val sink1 = mock<KafkaSink>().also {
-    whenever(it.name()).thenReturn("")
-    whenever(it.maxPayloadSizeBytes()).thenReturn(1)
-}
-
-private val sink2 = mock<KafkaSink>().also {
-    whenever(it.name()).thenReturn("")
-    whenever(it.maxPayloadSizeBytes()).thenReturn(2)
-}
-
-private val sink3 = mock<KafkaSink>().also {
-    whenever(it.name()).thenReturn("")
-    whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
-}
index cb8d500..ca09d84 100644 (file)
@@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import java.io.InputStreamReader
 import java.io.Reader
-import java.time.Duration
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -37,14 +36,14 @@ internal object ConfigurationMergerTest : Spek({
     describe("Merges partial configurations into one") {
         it("merges single parameter into empty config") {
             val actual = PartialConfiguration()
-            val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+            val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN))
 
             val result = ConfigurationMerger().merge(actual, diff)
 
-            assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+            assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN))
         }
 
-        val someListenPort = Some(45)
+        val someListenPort = Some(defaultListenPort)
         it("merges single embedded parameter into empty config") {
             val actual = PartialConfiguration()
             val diff = PartialConfiguration(listenPort = someListenPort)
@@ -58,11 +57,11 @@ internal object ConfigurationMergerTest : Spek({
             val actual = JsonConfigurationParser().parse(
                     InputStreamReader(
                             JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
-            val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+            val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN))
 
             val result = ConfigurationMerger().merge(actual, diff)
 
-            assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+            assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN))
         }
 
         it("merges single embedded parameter into full config") {
@@ -74,7 +73,6 @@ internal object ConfigurationMergerTest : Spek({
             val result = ConfigurationMerger().merge(actual, diff)
 
             assertThat(result.listenPort).isEqualTo(someListenPort)
-            assertThat(result.idleTimeoutSec.isEmpty()).isFalse()
             assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
         }
 
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt
new file mode 100644 (file)
index 0000000..42919e4
--- /dev/null
@@ -0,0 +1,218 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
+import java.io.File
+import java.time.Duration
+
+
+internal object ConfigurationTransformerTest : Spek({
+    describe("ConfigurationTransformer") {
+        val cut = ConfigurationTransformer()
+
+        describe("transforming partial configuration to final") {
+            val config = ValidatedPartialConfiguration(
+                    listenPort = defaultListenPort,
+                    idleTimeoutSec = defaultIdleTimeoutSec,
+                    cbsConfiguration = ValidatedCbsConfiguration(
+                            firstRequestDelaySec = defaultFirstReqDelaySec,
+                            requestIntervalSec = defaultRequestIntervalSec
+                    ),
+                    securityConfiguration = Some(ValidatedSecurityPaths(
+                            keyStoreFile = KEYSTORE,
+                            keyStorePasswordFile = KEYSTORE_PASS_FILE,
+                            trustStoreFile = TRUSTSTORE,
+                            trustStorePasswordFile = TRUSTSTORE_PASS_FILE
+                    )),
+                    streamPublishers = sampleStreamsDefinition,
+                    logLevel = Some(LogLevel.TRACE)
+            )
+
+            given("transformed configuration") {
+                val result = cut.toFinalConfiguration(config)
+
+                it("should create server configuration") {
+                    assertThat(result.server.listenPort).isEqualTo(defaultListenPort)
+                    assertThat(result.server.idleTimeout)
+                            .describedAs("idleTimeout transformed from number to duration")
+                            .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
+                }
+
+                it("should create CBS configuration") {
+                    assertThat(result.cbs.firstRequestDelay)
+                            .describedAs("firstRequestDelay transformed from number to duration")
+                            .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
+                    assertThat(result.cbs.requestInterval)
+                            .describedAs("requestInterval transformed from number to duration")
+                            .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
+                }
+
+                it("should create collector configuration") {
+                    assertThat(result.collector.routing)
+                            .describedAs("routing transformed from kafka sinks to routes")
+                            .isEqualTo(sampleRouting)
+
+                    assertThat(result.collector.maxPayloadSizeBytes)
+                            .describedAs("maxPayloadSizeBytes calculated from kafka sinks")
+                            .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+                }
+
+                it("should use specified log level") {
+                    assertThat(result.logLevel)
+                            .describedAs("logLevel was not transformed when present")
+                            .isEqualTo(LogLevel.TRACE)
+                }
+
+                it("should create security keys") {
+                    result.security.keys.fold({ fail("Should be Some") }, {
+                        assertThat(it.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
+                        assertThat(it.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
+                        it.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
+                        it.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
+                    })
+                }
+            }
+        }
+
+        describe("transforming configuration with empty log level") {
+            val config = validatedConfiguration(
+                    logLevel = None
+            )
+
+            it("should use default log level") {
+                val result = cut.toFinalConfiguration(config)
+
+                assertThat(result.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
+            }
+        }
+
+        describe("transforming configuration with security disabled") {
+            val config = validatedConfiguration(
+                    sslDisable = Some(true),
+                    keyStoreFile = "",
+                    keyStorePasswordFile = "",
+                    trustStoreFile = "",
+                    trustStorePasswordFile = ""
+            )
+
+            it("should create valid configuration with empty security keys") {
+                val result = cut.toFinalConfiguration(config)
+
+                assertThat(result.security.keys).isEqualTo(None)
+            }
+        }
+
+        describe("transforming configuration with ssl disable missing") {
+            val config = validatedConfiguration(
+                    sslDisable = None
+            )
+
+            it("should create configuration with ssl enabled") {
+                val result = cut.toFinalConfiguration(config)
+                val securityKeys = result.security.keys
+                        .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
+                assertThat(securityKeys.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
+                assertThat(securityKeys.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
+                securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
+                securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
+            }
+        }
+
+        describe("calculating maxPayloadSizeBytes") {
+            on("defined routes") {
+                val highestMaxPayloadSize = 3
+                val sink1 = mock<KafkaSink>().also {
+                    whenever(it.name()).thenReturn("1")
+                    whenever(it.maxPayloadSizeBytes()).thenReturn(1)
+                }
+                val sink2 = mock<KafkaSink>().also {
+                    whenever(it.name()).thenReturn("2")
+                    whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
+                }
+                val config = validatedConfiguration(
+                        streamPublishers = listOf(sink1, sink2)
+                )
+
+                val result = cut.toFinalConfiguration(config)
+
+                it("should use the highest value among all routes") {
+                    assertThat(result.collector.maxPayloadSizeBytes)
+                            .isEqualTo(highestMaxPayloadSize)
+                }
+            }
+
+            on("empty routing") {
+                val config = validatedConfiguration(
+                        streamPublishers = emptyList()
+                )
+
+                val result = cut.toFinalConfiguration(config)
+
+                it("should use default value") {
+                    assertThat(result.collector.maxPayloadSizeBytes)
+                            .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+                }
+            }
+        }
+
+    }
+})
+
+private fun validatedConfiguration(listenPort: Int = defaultListenPort,
+                                   idleTimeoutSec: Long = defaultIdleTimeoutSec,
+                                   firstReqDelaySec: Long = defaultFirstReqDelaySec,
+                                   requestIntervalSec: Long = defaultRequestIntervalSec,
+                                   sslDisable: Option<Boolean> = Some(false),
+                                   keyStoreFile: String = KEYSTORE,
+                                   keyStorePasswordFile: String = KEYSTORE_PASS_FILE,
+                                   trustStoreFile: String = TRUSTSTORE,
+                                   trustStorePasswordFile: String = TRUSTSTORE_PASS_FILE,
+                                   streamPublishers: List<KafkaSink> = sampleStreamsDefinition,
+                                   logLevel: Option<LogLevel> = Some(LogLevel.INFO)
+): ValidatedPartialConfiguration = PartialConfiguration(
+        listenPort = Some(listenPort),
+        idleTimeoutSec = Some(idleTimeoutSec),
+        firstRequestDelaySec = Some(firstReqDelaySec),
+        requestIntervalSec = Some(requestIntervalSec),
+        streamPublishers = Some(streamPublishers),
+        sslDisable = sslDisable,
+        keyStoreFile = Some(keyStoreFile),
+        keyStorePasswordFile = Some(keyStorePasswordFile),
+        trustStoreFile = Some(trustStoreFile),
+        trustStorePasswordFile = Some(trustStorePasswordFile),
+        logLevel = logLevel
+).unsafeAsValidated()
+
index 5495c86..26a9cc5 100644 (file)
@@ -22,101 +22,84 @@ package org.onap.dcae.collectors.veshv.config.impl
 import arrow.core.None
 import arrow.core.Option
 import arrow.core.Some
-import arrow.core.getOrElse
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
+import org.assertj.core.api.Assertions.*
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
-import java.io.File
-import java.nio.file.Paths
-import java.time.Duration
 
 internal object ConfigurationValidatorTest : Spek({
     describe("ConfigurationValidator") {
         val cut = ConfigurationValidator()
 
         describe("validating partial configuration with missing fields") {
-            val config = PartialConfiguration(
-                    listenPort = Some(1)
-            )
-
-            it("should return ValidationError") {
-                val result = cut.validate(config)
-                assertThat(result.isLeft()).isTrue()
-            }
-        }
+            val config = PartialConfiguration(listenPort = Some(5))
 
-        describe("validating configuration with empty log level") {
-            val config = partialConfiguration(
-                    logLevel = None
-            )
-
-            it("should use default log level") {
+            it("should return ValidationException with missing required fields description") {
                 val result = cut.validate(config)
-                result.fold(
-                        {
-                            fail("Configuration should have been created successfully")
-                        },
-                        {
-                            assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
-                        }
-                )
+                result.fold({
+                    assertThat(it.message).doesNotContain(PartialConfiguration::listenPort.name)
+
+                    assertThat(it.message).contains(PartialConfiguration::idleTimeoutSec.name)
+                    assertThat(it.message).contains(PartialConfiguration::firstRequestDelaySec.name)
+                    assertThat(it.message).contains(PartialConfiguration::requestIntervalSec.name)
+                    assertThat(it.message).contains(PartialConfiguration::streamPublishers.name)
+                    assertThat(it.message).contains(PartialConfiguration::keyStoreFile.name)
+                    assertThat(it.message).contains(PartialConfiguration::keyStorePasswordFile.name)
+                    assertThat(it.message).contains(PartialConfiguration::trustStoreFile.name)
+                    assertThat(it.message).contains(PartialConfiguration::trustStorePasswordFile.name)
+
+                    assertThat(it.message).doesNotContain(PartialConfiguration::logLevel.name)
+                    assertThat(it.message).doesNotContain(PartialConfiguration::sslDisable.name)
+                }, { fail("Should be ValidationException") })
             }
         }
 
-        describe("validating complete configuration") {
+        describe("validating complete valid configuration") {
             val config = PartialConfiguration(
                     listenPort = Some(defaultListenPort),
                     idleTimeoutSec = Some(defaultIdleTimeoutSec),
                     firstRequestDelaySec = Some(defaultFirstReqDelaySec),
                     requestIntervalSec = Some(defaultRequestIntervalSec),
                     sslDisable = Some(false),
-                    keyStoreFile = Some(keyStore),
-                    keyStorePasswordFile = Some(keyStorePassFile),
-                    trustStoreFile = Some(trustStore),
-                    trustStorePasswordFile = Some(trustStorePassFile),
+                    keyStoreFile = Some(KEYSTORE),
+                    keyStorePasswordFile = Some(KEYSTORE_PASSWORD),
+                    trustStoreFile = Some(TRUSTSTORE),
+                    trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD),
                     streamPublishers = Some(sampleStreamsDefinition),
                     logLevel = Some(LogLevel.TRACE)
             )
 
-            it("should create valid configuration") {
+            it("should create validated configuration") {
                 val result = cut.validate(config)
                 result.fold(
                         {
                             fail("Configuration should have been created successfully")
                         },
                         {
-                            assertThat(it.server.listenPort)
+                            assertThat(it.listenPort)
                                     .isEqualTo(defaultListenPort)
-                            assertThat(it.server.idleTimeout)
-                                    .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
-
-                            val securityKeys = it.security.keys
-                                    .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
-                            assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath())
-                            assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath())
-                            securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) }
-                            securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) }
-
-                            assertThat(it.cbs.firstRequestDelay)
-                                    .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
-                            assertThat(it.cbs.requestInterval)
-                                    .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
-
-                            assertThat(it.collector.routing)
-                                    .isEqualTo(sampleRouting)
-                            assertThat(it.collector.maxPayloadSizeBytes)
-                                    .isEqualTo(sampleMaxPayloadSize)
-
-                            assertThat(it.logLevel).isEqualTo(LogLevel.TRACE)
+                            assertThat(it.idleTimeoutSec)
+                                    .isEqualTo(defaultIdleTimeoutSec)
+
+                            it.securityConfiguration.fold({
+                                fail("Should have been validated successfully")
+                            }, {
+                                assertThat(it.keyStoreFile).isEqualTo(KEYSTORE)
+                                assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD)
+                                assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE)
+                                assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD)
+                            })
+
+                            assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec)
+                            assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec)
+
+                            assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition)
+
+                            assertThat(it.logLevel).isEqualTo(Some(LogLevel.TRACE))
                         }
                 )
             }
@@ -126,29 +109,26 @@ internal object ConfigurationValidatorTest : Spek({
             val config = partialConfiguration(
                     sslDisable = Some(true),
                     keyStoreFile = Some(""),
-                    keyStorePassword = Some(""),
-                    trustStoreFile = Some(""),
-                    trustStorePassword = Some("")
+                    keyStorePasswordFile = None,
+                    trustStoreFile = None,
+                    trustStorePasswordFile = Some("")
             )
 
-            it("should create valid configuration") {
+            it("should return validated configuration regardless of security keys presence") {
                 val result = cut.validate(config)
                 result.fold(
                         {
                             fail("Configuration should have been created successfully but was $it")
                         },
                         {
-                            assertThat(it.server.idleTimeout)
-                                    .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
+                            assertThat(it.idleTimeoutSec).isEqualTo(defaultIdleTimeoutSec)
 
-                            assertThat(it.security.keys)
-                                    .isEqualTo(None)
+                            assertThat(it.securityConfiguration.isEmpty()).isTrue()
 
-                            assertThat(it.cbs.firstRequestDelay)
-                                    .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
+                            assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec)
+                            assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec)
 
-                            assertThat(it.collector.routing)
-                                    .isEqualTo(sampleRouting)
+                            assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition)
                         }
                 )
             }
@@ -159,24 +139,81 @@ internal object ConfigurationValidatorTest : Spek({
                     sslDisable = None
             )
 
-            it("should create valid configuration with ssl enabled") {
+            it("should return validated configuration") {
                 val result = cut.validate(config)
                 result.fold(
                         {
                             fail("Configuration should have been created successfully but was $it")
                         },
                         {
-                            val securityKeys = it.security.keys
-                                    .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
-                            assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath())
-                            assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath())
-                            securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) }
-                            securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) }
+                            it.securityConfiguration.fold({
+                                fail("Should have been validated successfully")
+                            }, {
+                                assertThat(it.keyStoreFile).isEqualTo(KEYSTORE)
+                                assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD)
+                                assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE)
+                                assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD)
+                            })
+
                         }
                 )
             }
         }
 
+        describe("validating configuration with ssl enabled, but not all required security fields set") {
+            val config = partialConfiguration(
+                    sslDisable = Some(false),
+                    keyStoreFile = Some(KEYSTORE),
+                    keyStorePasswordFile = None,
+                    trustStoreFile = None,
+                    trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD)
+            )
+
+            it("should return validated configuration") {
+                val result = cut.validate(config)
+
+                assertThat(result.isLeft())
+                        .describedAs("security validation result")
+                        .isTrue()
+            }
+        }
+
+        describe("validating CBS configuration from partial") {
+            given("valid CBS configuration") {
+                val config = partialConfiguration()
+
+                it("should returned validated config") {
+                    val result = cut.validatedCbsConfiguration(config)
+
+                    assertThat(result.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec)
+                    assertThat(result.requestIntervalSec).isEqualTo(defaultRequestIntervalSec)
+                }
+
+            }
+
+            given("missing firstReqDelaySec") {
+                val config = partialConfiguration(
+                        firstReqDelaySec = None
+                )
+
+                it("should throw validation exception") {
+                    assertThatExceptionOfType(ValidationException::class.java).isThrownBy {
+                        cut.validatedCbsConfiguration(config)
+                    }.withMessageContaining(PartialConfiguration::firstRequestDelaySec.name)
+                }
+            }
+
+            given("missing requestIntervalSec") {
+                val config = partialConfiguration(
+                        requestIntervalSec = None)
+
+                it("should throw validation exception") {
+                    assertThatExceptionOfType(ValidationException::class.java).isThrownBy {
+                        cut.validatedCbsConfiguration(config)
+                    }.withMessageContaining(PartialConfiguration::requestIntervalSec.name)
+                }
+            }
+        }
     }
 })
 
@@ -185,10 +222,10 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
                                  firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec),
                                  requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec),
                                  sslDisable: Option<Boolean> = Some(false),
-                                 keyStoreFile: Option<String> = Some(keyStore),
-                                 keyStorePassword: Option<String> = Some(keyStorePassFile),
-                                 trustStoreFile: Option<String> = Some(trustStore),
-                                 trustStorePassword: Option<String> = Some(trustStorePassFile),
+                                 keyStoreFile: Option<String> = Some(KEYSTORE),
+                                 keyStorePasswordFile: Option<String> = Some(KEYSTORE_PASSWORD),
+                                 trustStoreFile: Option<String> = Some(TRUSTSTORE),
+                                 trustStorePasswordFile: Option<String> = Some(TRUSTSTORE_PASSWORD),
                                  streamPublishers: Option<List<KafkaSink>> = Some(sampleStreamsDefinition),
                                  logLevel: Option<LogLevel> = Some(LogLevel.INFO)
 ) = PartialConfiguration(
@@ -198,35 +235,9 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
         requestIntervalSec = requestIntervalSec,
         sslDisable = sslDisable,
         keyStoreFile = keyStoreFile,
-        keyStorePasswordFile = keyStorePassword,
+        keyStorePasswordFile = keyStorePasswordFile,
         trustStoreFile = trustStoreFile,
-        trustStorePasswordFile = trustStorePassword,
+        trustStorePasswordFile = trustStorePasswordFile,
         streamPublishers = streamPublishers,
         logLevel = logLevel
 )
-
-private fun resourcePathAsString(resource: String) =
-        Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString()
-
-private const val defaultListenPort = 1234
-private const val defaultRequestIntervalSec = 3L
-private const val defaultIdleTimeoutSec = 10L
-private const val defaultFirstReqDelaySec = 10L
-
-private const val keyStore = "test.ks.pkcs12"
-private const val trustStore = "trust.ks.pkcs12"
-private const val keyStorePass = "change.me"
-private const val trustStorePass = "change.me.too"
-private val keyStorePassFile = resourcePathAsString("/test.ks.pass")
-private val trustStorePassFile = resourcePathAsString("/trust.ks.pass")
-
-private const val sampleSinkName = "perf3gpp"
-const val sampleMaxPayloadSize = 1024
-
-private val sink = mock<KafkaSink>().also {
-    whenever(it.name()).thenReturn(sampleSinkName)
-    whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize)
-}
-
-private val sampleStreamsDefinition = listOf(sink)
-private val sampleRouting = listOf(Route(sink.name(), sink))
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt
new file mode 100644 (file)
index 0000000..f07af07
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import java.nio.file.Paths
+
+private fun resourcePathAsString(resource: String) =
+        Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString()
+
+internal val DEFAULT_LOG_LEVEL = LogLevel.INFO
+
+internal const val defaultListenPort = 1234
+internal const val defaultRequestIntervalSec = 3L
+internal const val defaultIdleTimeoutSec = 10L
+internal const val defaultFirstReqDelaySec = 10L
+
+internal const val KEYSTORE = "test.ks.pkcs12"
+internal const val KEYSTORE_PASSWORD = "change.me"
+internal const val TRUSTSTORE = "trust.ks.pkcs12"
+internal const val TRUSTSTORE_PASSWORD = "change.me.too"
+internal val KEYSTORE_PASS_FILE = resourcePathAsString("/test.ks.pass")
+internal val TRUSTSTORE_PASS_FILE = resourcePathAsString("/trust.ks.pass")
+
+internal const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+
+private val sampleSink = mock<KafkaSink>().also {
+    whenever(it.name()).thenReturn("perf3gpp")
+    whenever(it.maxPayloadSizeBytes()).thenReturn(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+}
+
+internal val sampleStreamsDefinition = listOf(sampleSink)
+internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file
index 3002f33..78d2e70 100644 (file)
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
     describe("VES High Volume Collector performance") {
         it("should handle multiple clients in reasonable time") {
             val sink = CountingSink()
-            val sut = Sut(CollectorConfiguration(basicRouting), sink)
+            val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink)
 
             val numMessages: Long = 300_000
             val runs = 4
@@ -87,7 +87,7 @@ object PerformanceSpecification : Spek({
 
         it("should disconnect on transmission errors") {
             val sink = CountingSink()
-            val sut = Sut(CollectorConfiguration(basicRouting), sink)
+            val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink)
 
             val numMessages: Long = 100_000
             val timeout = Duration.ofSeconds(30)
index 88d1567..4e9b7ef 100644 (file)
@@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
 import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
 import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import org.onap.dcae.collectors.veshv.utils.Closeable
@@ -95,10 +96,10 @@ class DummySinkFactory(private val sink: Sink) : SinkFactory {
 }
 
 fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
-        Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
+        Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysSuccessfulSink())
 
 fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
-        Sut(CollectorConfiguration(routing), AlwaysFailingSink())
+        Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysFailingSink())
 
 fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
-        Sut(CollectorConfiguration(routing), DelayingSink(delay))
+        Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), DelayingSink(delay))
index f90f4bc..e74e1f6 100644 (file)
@@ -215,6 +215,6 @@ object VesHvSpecification : Spek({
 
 private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
     val sink = StoringSink()
-    val sut = Sut(CollectorConfiguration(routing), sink)
+    val sut = Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), sink)
     return Pair(sut, sink)
 }
index cfed7f3..ceae62d 100644 (file)
@@ -17,6 +17,9 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
+
+@file:Suppress("TooManyFunctions")
+
 package org.onap.dcae.collectors.veshv.utils.arrow
 
 import arrow.core.Either
@@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference
  * @since July 2018
  */
 
+
 object OptionUtils {
     fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A)
             : Option<A> = Option.monad().binding(c).fix()
@@ -78,6 +82,17 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply {
 fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B)
         : Option<B> = let { OptionUtils.binding { c(it) } }
 
+fun <T> Option<Boolean>.flatFold(ifEmptyOrFalse: () -> T, ifTrue: () -> T) =
+        fold({
+            ifEmptyOrFalse()
+        }, {
+            if (it) {
+                ifTrue()
+            } else {
+                ifEmptyOrFalse()
+            }
+        })
+