private val configReader = FileConfigurationReader()
private val configValidator = ConfigurationValidator()
+ private lateinit var initialConfig: HvVesConfiguration
+
fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
Flux.just(cmd.parse(args))
.throwOnLeft { MissingArgumentException(it.message, it.cause) }
.map { it.reader().use(configReader::loadConfig) }
.map { configValidator.validate(it) }
.throwOnLeft { ValidationException(it.message) }
+ .doOnNext { initialConfig = it }
+
}
data class Routing(val routes: List<Route>)
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 })
\ No newline at end of file
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 })
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+
+/**
+ * @author Pawel Biniek <pawel.biniek@nokia.com>
+ * @since March 2019
+ */
+internal class ConfigurationMerger {
+ fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration =
+ PartialConfiguration(
+ mergeServerConfig(base.server, update.server),
+ mergeCbsConfig(base.cbs, update.cbs),
+ mergeSecurityConfig(base.security, update.security),
+ mergeCollectorConfig(base.collector, update.collector),
+ mergeLogLevel(base.logLevel, update.logLevel)
+ )
+
+
+ private fun mergeServerConfig(baseOption: Option<PartialServerConfig>,
+ updateOption: Option<PartialServerConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialServerConfig(
+ base.listenPort.updateToGivenOrNone(update.listenPort),
+ base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
+ base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes)
+ )
+ }
+
+
+ private fun mergeCbsConfig(baseOption: Option<PartialCbsConfig>,
+ updateOption: Option<PartialCbsConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialCbsConfig(
+ base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
+ base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec)
+ )
+ }
+
+ private fun mergeSecurityConfig(baseOption: Option<PartialSecurityConfig>,
+ updateOption: Option<PartialSecurityConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialSecurityConfig(
+ base.keys.updateToGivenOrNone(update.keys)
+ )
+ }
+
+ private fun mergeCollectorConfig(baseOption: Option<PartialCollectorConfig>,
+ updateOption: Option<PartialCollectorConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialCollectorConfig(
+ base.maxRequestSizeBytes.updateToGivenOrNone(update.maxRequestSizeBytes),
+ base.kafkaServers.updateToGivenOrNone(update.kafkaServers),
+ base.routing.updateToGivenOrNone(update.routing)
+ )
+ }
+
+
+ private fun mergeLogLevel(base: Option<LogLevel>, update: Option<LogLevel>) =
+ base.updateToGivenOrNone(update)
+}
+
+private fun <T> applyUpdate(base: Option<T>, update: Option<T>, overrider: (base: T, update: T) -> T) =
+ when {
+ base is Some && update is Some -> overrider(base.t, update.t).toOption()
+ base is Some && update is None -> base
+ base is None && update is Some -> update
+ else -> None
+ }
+
+private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
+ update.getOrElse(this::orNull).toOption()
partial.mapBinding {
ServerConfiguration(
it.listenPort.bind(),
- Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()),
+ it.idleTimeoutSec.bind(),
it.maxPayloadSizeBytes.bind()
)
}
private fun createCbsConfiguration(partial: PartialCbsConfig) =
partial.mapBinding {
CbsConfiguration(
- Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()),
- Duration.ofSeconds(it.requestIntervalSec.bind().toLong())
+ it.firstRequestDelaySec.bind(),
+ it.requestIntervalSec.bind()
)
}
import arrow.core.Option
import com.google.gson.GsonBuilder
+import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.SecurityAdapter
+
import java.io.Reader
-import java.net.InetSocketAddress
+import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
private val gson = GsonBuilder()
.registerTypeAdapter(Option::class.java, OptionAdapter())
.registerTypeAdapter(PartialSecurityConfig::class.java, SecurityAdapter())
+ .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter())
.create()
fun loadConfig(input: Reader): PartialConfiguration =
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
import java.net.InetSocketAddress
+import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
val server: Option<PartialServerConfig> = None,
val cbs: Option<PartialCbsConfig> = None,
val security: Option<PartialSecurityConfig> = None,
-// TOD0: retrieve when ConfigurationMerger is implemented
-// val collector: Option<PartialCollectorConfig> = None,
+ val collector: Option<PartialCollectorConfig> = None,
val logLevel: Option<LogLevel> = None
)
internal data class PartialServerConfig(
val listenPort: Option<Int> = None,
- val idleTimeoutSec: Option<Int> = None,
+ val idleTimeoutSec: Option<Duration> = None,
val maxPayloadSizeBytes: Option<Int> = None
)
internal data class PartialCbsConfig(
- val firstRequestDelaySec: Option<Int> = None,
- val requestIntervalSec: Option<Int> = None
+ val firstRequestDelaySec: Option<Duration> = None,
+ val requestIntervalSec: Option<Duration> = None
)
internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
-// TOD0: retrieve when ConfigurationMerger is implemented
-//internal data class PartialCollectorConfig(
-// val maxRequestSizeBytes: Option<Int> = None,
-// val kafkaServers: Option<List<InetSocketAddress>> = None,
-// val routing: Option<Routing> = None
-//)
+
+internal data class PartialCollectorConfig(
+ val maxRequestSizeBytes: Option<Int> = None,
+ val kafkaServers: Option<List<InetSocketAddress>> = None,
+ val routing: Option<Routing> = None
+)
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
+
+import com.google.gson.JsonDeserializationContext
+import com.google.gson.JsonDeserializer
+import com.google.gson.JsonElement
+import java.lang.reflect.Type
+import java.time.Duration
+
+/**
+ * @author Pawel Biniek <pawel.biniek@nokia.com>
+ * @since March 2019
+ */
+class DurationOfSecondsAdapter : JsonDeserializer<Duration> {
+ override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) =
+ Duration.ofSeconds(json.asLong)
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Some
+import org.jetbrains.spek.api.Spek
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import java.io.InputStreamReader
+import java.io.Reader
+import java.time.Duration
+
+/**
+ * @author Pawel Biniek <pawel.biniek@nokia.com>
+ * @since February 2019
+ */
+internal object ConfigurationMergerTest : Spek({
+ describe("Merges partial configurations into one") {
+ it("merges single parameter into empty config") {
+ val actual = PartialConfiguration()
+ val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+ }
+
+ it("merges single embedded parameter into empty config") {
+ val actual = PartialConfiguration()
+ val serverConfig = PartialServerConfig(listenPort = Some(45))
+ val diff = PartialConfiguration(server = Some(serverConfig))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.server).isEqualTo(Some(serverConfig))
+ }
+
+ it("merges single parameter into full config") {
+ val actual = FileConfigurationReader().loadConfig(
+ InputStreamReader(
+ FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+ val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+ }
+
+ it("merges single embedded parameter into full config") {
+ val actual = FileConfigurationReader().loadConfig(
+ InputStreamReader(
+ FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+ val serverConfig = PartialServerConfig(listenPort = Some(45))
+ val diff = PartialConfiguration(server = Some(serverConfig))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.server.orNull()?.listenPort).isEqualTo(serverConfig.listenPort)
+ assertThat(result.server.orNull()?.idleTimeoutSec?.isEmpty()).isFalse()
+ assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+ assertThat(result.server.orNull()?.maxPayloadSizeBytes?.isEmpty()).isFalse()
+ assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000))
+ }
+
+ it("merges full config into single parameter") {
+ val actual = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+ val diff = FileConfigurationReader().loadConfig(
+ InputStreamReader(
+ FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
+ assertThat(result.server.isEmpty()).isFalse()
+ assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000))
+ assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+
+ assertThat(result.security.isEmpty()).isFalse()
+ assertThat(result.cbs.isEmpty()).isFalse()
+ }
+ }
+})
+
val config = PartialConfiguration(
Some(PartialServerConfig(
Some(1),
- Some(2),
+ Some(Duration.ofSeconds(2)),
Some(3)
)),
Some(PartialCbsConfig(
- Some(5),
- Some(3)
+ Some(Duration.ofSeconds(5)),
+ Some(Duration.ofSeconds(3))
)),
Some(PartialSecurityConfig(
Some(mock())
)),
-// TOD0: retrieve when ConfigurationMerger is implemented
-// Some(PartialCollectorConfig(
-// Some(4),
-// Some(emptyList()),
-// someFromEmptyRouting
-// )),
+ Some(PartialCollectorConfig(
+ Some(4),
+ Some(emptyList()),
+ someFromEmptyRouting
+ )),
None
)
}
describe("validating complete configuration") {
- val idleTimeoutSec = 10
- val firstReqDelaySec = 10
+ val idleTimeoutSec = Duration.ofSeconds(10L)
+ val firstReqDelaySec = Duration.ofSeconds(10L)
val securityKeys = Some(mock<SecurityKeys>())
val config = PartialConfiguration(
)),
Some(PartialCbsConfig(
Some(firstReqDelaySec),
- Some(3)
+ Some(Duration.ofSeconds(3))
)),
Some(PartialSecurityConfig(
securityKeys
)),
-// TOD0: retrieve when ConfigurationMerger is implemented
-// Some(PartialCollectorConfig(
-// Some(4),
-// Some(emptyList()),
-// someFromEmptyRouting
-// )),
+ Some(PartialCollectorConfig(
+ Some(4),
+ Some(emptyList()),
+ someFromEmptyRouting
+ )),
Some(LogLevel.INFO)
)
},
{
assertThat(it.server.idleTimeout)
- .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+ .isEqualTo(idleTimeoutSec)
assertThat(it.security.keys)
.isEqualTo(securityKeys)
assertThat(it.cbs.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+ .isEqualTo(firstReqDelaySec)
-// TOD0: retrieve when ConfigurationMerger is implemented
-// assertThat(it.collector.routing)
-// .isEqualTo(emptyRouting)
+ assertThat(it.collector.routing)
+ .isEqualTo(emptyRouting)
}
)
}
}
describe("validating configuration with security disabled") {
- val idleTimeoutSec = 10
- val firstReqDelaySec = 10
+ val idleTimeoutSec = Duration.ofSeconds(10)
+ val firstReqDelaySec = Duration.ofSeconds(10)
val securityKeys: Option<SecurityKeys> = None
val config = PartialConfiguration(
)),
Some(PartialCbsConfig(
Some(firstReqDelaySec),
- Some(3)
+ Some(Duration.ofSeconds(3))
)),
Some(PartialSecurityConfig(
securityKeys
)),
-// TOD0: retrieve when ConfigurationMerger is implemented
-// Some(PartialCollectorConfig(
-// Some(4),
-// Some(emptyList()),
-// someFromEmptyRouting
-// )),
+ Some(PartialCollectorConfig(
+ Some(4),
+ Some(emptyList()),
+ someFromEmptyRouting
+ )),
Some(LogLevel.INFO)
)
},
{
assertThat(it.server.idleTimeout)
- .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+ .isEqualTo(idleTimeoutSec)
assertThat(it.security.keys)
.isEqualTo(securityKeys)
assertThat(it.cbs.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+ .isEqualTo(firstReqDelaySec)
-// TOD0: retrieve when ConfigurationMerger is implemented
-// assertThat(it.collector.routing)
-// .isEqualTo(emptyRouting)
+ assertThat(it.collector.routing)
+ .isEqualTo(emptyRouting)
}
)
}
}
})
-// TOD0: retrieve when ConfigurationMerger is implemented
-//val emptyRouting = Routing(emptyList())
-//val someFromEmptyRouting = Some(emptyRouting)
+val emptyRouting = Routing(emptyList())
+val someFromEmptyRouting = Some(emptyRouting)
import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import java.io.StringReader
+import java.net.InetSocketAddress
+import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
assertThat(config.cbs.nonEmpty()).isTrue()
val cbs = config.cbs.orNull() as PartialCbsConfig
- assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7))
- assertThat(cbs.requestIntervalSec).isEqualTo(Some(900))
+ assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(7)))
+ assertThat(cbs.requestIntervalSec).isEqualTo(Some(Duration.ofSeconds(900)))
assertThat(config.server.nonEmpty()).isTrue()
val server = config.server.orNull() as PartialServerConfig
server.run {
- assertThat(idleTimeoutSec).isEqualTo(Some(1200))
+ assertThat(idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
assertThat(listenPort).isEqualTo(Some(6000))
assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000))
}
"trustStorePassword": "changeMeToo"
}
}
-}
\ No newline at end of file
+}