*/
package org.onap.dcae.collectors.veshv.config.api
+import arrow.core.getOrElse
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
-import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow
+import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
class ConfigurationModule {
private val cmd = HvVesCommandLineParser()
private val configReader = FileConfigurationReader()
private val configValidator = ConfigurationValidator()
+ private val merger = ConfigurationMerger()
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
fun hvVesConfigurationUpdates(args: Array<String>,
configStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
- Flux.just(cmd.getConfigurationFile(args))
- .throwOnLeft { MissingArgumentException(it.message, it.cause) }
- .map { it.reader().use(configReader::loadConfig) }
+ Mono.just(cmd.getConfigurationFile(args))
+ .throwOnLeft(::MissingArgumentException)
+ .map {
+ logger.info { "Using base configuration file: ${it.absolutePath}" }
+ it.reader().use(configReader::loadConfig)
+ }
.cache()
- .flatMap { basePartialConfig ->
- val baseConfig = configValidator.validate(basePartialConfig)
- .rightOrThrow { ValidationException(it.message) }
- val cbsConfigProvider = CbsConfigurationProvider(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- baseConfig.cbs,
- configStateListener,
- mdc)
- val merger = ConfigurationMerger()
- cbsConfigProvider()
+ .flatMapMany { basePartialConfig ->
+ cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
+ .invoke()
.map { merger.merge(basePartialConfig, it) }
- .map { configValidator.validate(it) }
- .throwOnLeft { ValidationException(it.message) }
+ .map(configValidator::validate)
+ .throwOnLeft()
}
+ private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
+ configStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext): CbsConfigurationProvider =
+ CbsConfigurationProvider(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ cbsConfigurationFrom(basePartialConfig),
+ configStateListener,
+ mdc)
+
+ private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
+ configValidator.validatedCbsConfiguration(basePartialConfig)
+ .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+
+ companion object {
+ private val logger = Logger(ConfigurationModule::class)
+ }
}