import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.adapters.ConsulConfigurationProvider
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5
import java.util.concurrent.atomic.AtomicReference
/**
private val metrics: Metrics) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val initialValue = createVesHvCollector(defaultConfiguration())
- val collector: AtomicReference<Collector> = AtomicReference(initialValue)
+ val collector: AtomicReference<Collector> = AtomicReference()
configuration()
.map(this::createVesHvCollector)
.doOnNext { logger.info("Using updated configuration for new connections") }
return collector::get
}
- private fun defaultConfiguration() =
- CollectorConfiguration(
- kafkaBootstrapServers = "kafka:9092",
- routing = routing {
- defineRoute {
- fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
- toTopic("ves_hvRanMeas")
- withFixedPartitioning()
- }
- }.build())
-
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.None
import arrow.core.Option
-import arrow.core.Some
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine)
- val configurationProviderParams = createConfigurationProviderParams(cmdLine)
-
+ val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
listenPort = listenPort,
configurationProviderParams = configurationProviderParams,
}.fix()
}
+ private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
+ ForOption extensions {
+ binding {
+ val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
+ val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
+ val requestInterval = cmdLine.longValue(CONSUL_REQUEST_INTERVAL, DefaultValues.CONSUL_REQUEST_INTERVAL)
- private fun createConfigurationProviderParams(cmdLine: CommandLine): ConfigurationProviderParams {
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
- val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
- val requestInterval = cmdLine.longValue(CONSUL_REQUEST_INTERVAL, DefaultValues.CONSUL_REQUEST_INTERVAL)
-
- return ConfigurationProviderParams(
- configUrl,
- Duration.ofSeconds(firstRequestDelay),
- Duration.ofSeconds(requestInterval)
- )
- }
+ ConfigurationProviderParams(
+ configUrl,
+ Duration.ofSeconds(firstRequestDelay),
+ Duration.ofSeconds(requestInterval)
+ )
+ }.fix()
+ }
private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
val sslDisable = cmdLine.hasOption(SSL_DISABLE)
internal object DefaultValues {
const val CONSUL_FIRST_REQUEST_DELAY = 10L
const val CONSUL_REQUEST_INTERVAL = 5L
- const val CONFIG_URL = ""
const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
const val CERT_FILE = "/etc/ves-hv/server.crt"
const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
lateinit var result: ServerConfiguration
beforeEachTest {
- result = cut.parseExpectingSuccess("-p", listenPort, "-c", configurationUrl, "-d", firstRequestDelay)
+ result = cut.parseExpectingSuccess(
+ "-p", listenPort, "-c", configurationUrl, "-d", firstRequestDelay
+ )
}
it("should set proper port") {
lateinit var result: ServerConfiguration
beforeEachTest {
- result = cut.parseExpectingSuccess("--listen-port", listenPort)
- }
-
- it("should set default config url") {
- assertThat(result.configurationProviderParams.configurationUrl)
- .isEqualTo(DefaultValues.CONFIG_URL)
+ result = cut.parseExpectingSuccess(
+ "--listen-port", listenPort, "--config-url", configurationUrl
+ )
}
it("should set default first consul request delay") {
}
describe("required parameter is absent") {
- given("listen port is missing") {
+ on("missing listen port") {
it("should throw exception") {
assertThat(cut.parseExpectingFailure(
- "--ssl-disable",
"--config-url", configurationUrl,
+ "--ssl-disable",
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--private-key-file", pk.toFile().absolutePath,
+ "--cert-file", cert.toFile().absolutePath,
+ "--trust-cert-file", trustCert.toFile().absolutePath)
+ ).isInstanceOf(WrongArgumentError::class.java)
+ }
+ }
+ on("missing configuration url") {
+ it("should throw exception") {
+ assertThat(cut.parseExpectingFailure(
+ "--listen-port", listenPort,
+ "--ssl-disable",
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval,
"--private-key-file", pk.toFile().absolutePath,
}
}
}
-})
+})
\ No newline at end of file
),
CONSUL_CONFIG_URL(Option.builder("c")
.longOpt("config-url")
+ .required()
.hasArg()
.desc("URL of ves configuration on consul")
.build()