Change Consul configuration update policy 13/58613/1
authorJakub Dudycz <jakub.dudycz@nokia.com>
Fri, 15 Jun 2018 14:09:41 +0000 (16:09 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 08:27:38 +0000 (10:27 +0200)
- At startup default config is applied
- Configuration is updated in intervals given at VES-HV
service startup to allow dynamic changes
- Included consul service startup in docker-compose file
- VES-HV now exits when fails to acquire confguration from consul

Closes ONAP-229

Change-Id: I896cfd177fa45381f9822278c2dffc113dd3df72
Signed-off-by: jakub.dudycz@nokia.com
Issue-ID: DCAEGEN2-601

14 files changed:
docker-compose.yml
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
hv-collector-main/Dockerfile
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt

index 65951ed..d98971b 100644 (file)
@@ -20,6 +20,14 @@ services:
     depends_on:
       - zookeeper
 
+  consul:
+      image: progrium/consul
+      ports:
+        - "8500:8500"
+      environment:
+        - CONSUL_BIND_INTERFACE=eth0
+      command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
+
   ves-hv-collector:
     image: onap/ves-hv-collector
 #    build:
@@ -29,6 +37,7 @@ services:
       - "6061:6061/tcp"
     depends_on:
       - kafka
+      - consul
     volumes:
       - ./ssl/:/etc/ves-hv/
 
index 8785180..06047fd 100644 (file)
@@ -49,7 +49,9 @@ class CollectorFactory(val configuration: ConfigurationProvider,
     }
 
     private fun createVesHvCollector(): Flux<Collector> =
-            configuration().map(this::createVesHvCollector)
+            configuration()
+                    .doOnError { System.exit(ERROR_CODE) }
+                    .map(this::createVesHvCollector)
 
     private fun createVesHvCollector(config: CollectorConfiguration): Collector {
         return VesHvCollector(
@@ -61,5 +63,8 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                 metrics = metrics)
     }
 
+    companion object {
+        const val ERROR_CODE = 3
+    }
 }
 
index 358be10..d9e7432 100644 (file)
@@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import reactor.core.publisher.Flux
 import reactor.ipc.netty.http.client.HttpClient
+import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -34,13 +33,8 @@ object AdapterFactory {
     fun kafkaSink(): SinkProvider = KafkaSinkProvider()
     fun loggingSink(): SinkProvider = LoggingSinkProvider()
 
-    fun staticConfigurationProvider(config: CollectorConfiguration) =
-            object : ConfigurationProvider {
-                override fun invoke() = Flux.just(config)
-            }
-
-    fun consulConfigurationProvider(url: String): ConfigurationProvider =
-            ConsulConfigurationProvider(url, httpAdapter())
+    fun consulConfigurationProvider(url: String, updateInterval: Duration): ConfigurationProvider =
+            ConsulConfigurationProvider(url, updateInterval, httpAdapter())
 
     fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
 }
index 04e4927..c70d128 100644 (file)
@@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
 import org.slf4j.LoggerFactory
 import reactor.core.publisher.Flux
 import java.io.StringReader
 import java.time.Duration
-import java.util.*
+import java.util.Base64
 import java.util.concurrent.atomic.AtomicReference
 import javax.json.Json
 import javax.json.JsonObject
@@ -36,20 +38,39 @@ import javax.json.JsonObject
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
-internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
-    : ConfigurationProvider {
+internal class ConsulConfigurationProvider(private val url: String,
+                                           private val updateInterval: Duration,
+                                           private val http: HttpAdapter
+) : ConfigurationProvider {
 
 
     private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
     private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
 
     override fun invoke(): Flux<CollectorConfiguration> =
-            Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
-                    .flatMap { http.getResponse(url) }
-                    .filter { body -> body.hashCode() != lastConfigurationHash.get() }
-                    .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
-                    .map { str -> getConfigurationJson(str) }
-                    .map { json -> createCollectorConfiguration(json) }
+            Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
+
+    private fun createDefaultConfigurationFlux(): Flux<CollectorConfiguration> = Flux.just(
+            CollectorConfiguration(
+                    kafkaBootstrapServers = "kafka:9092",
+                    routing = routing {
+                        defineRoute {
+                            fromDomain(HVRANMEAS)
+                            toTopic("ves_hvRanMeas")
+                            withFixedPartitioning()
+                        }
+                    }.build())
+    ).doOnNext { logger.info("Applied default configuration") }
+
+    private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux.interval(updateInterval)
+            .flatMap { http.get(url) }
+            .doOnError { logger.error("Encountered an error when trying to acquire configuration from consul. " +
+                    "Shutting down..") }
+            .filter { it.hashCode() != lastConfigurationHash.get() }
+            .doOnNext { lastConfigurationHash.set(it.hashCode()) }
+            .map { getConfigurationJson(it) }
+            .map { createCollectorConfiguration(it) }
+
 
     private fun getConfigurationJson(str: String): JsonObject {
         val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
@@ -60,23 +81,21 @@ internal class ConsulConfigurationProvider(private val url: String, private val
     }
 
     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
-
-        val routing = configuration.getJsonObject("routing")
+        val routing = configuration.getJsonArray("routing")
 
         return CollectorConfiguration(
                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
                 routing = org.onap.dcae.collectors.veshv.model.routing {
-                    defineRoute {
-                        fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
-                        toTopic(routing.getString("toTopic"))
-                        withFixedPartitioning()
+                    for (route in routing) {
+                        val routeObj = route.asJsonObject()
+                        defineRoute {
+                            fromDomain(forNumber(routeObj.getInt("fromDomain")))
+                            toTopic(routeObj.getString("toTopic"))
+                            withFixedPartitioning()
+                        }
                     }
                 }.build()
         )
     }
-
-    companion object {
-        private const val REFRESH_INTERVAL_MINUTES: Long = 5
-        private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
-    }
 }
+
index 236e3cb..a41cd09 100644 (file)
@@ -34,15 +34,12 @@ open class HttpAdapter(private val httpClient: HttpClient) {
 
     private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
 
-    open fun getResponse(url: String): Mono<String> =
+    open fun get(url: String): Mono<String> =
             httpClient.get(url)
-                    .onErrorResume { e -> unableToGetResource(e, url) }
-                    .flatMap { res -> res.receiveContent().toMono() }
-                    .map { content -> content.content().toString(Charset.defaultCharset()) }
-
-
-    private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> {
-        logger.info("Failed to get resource on path: $url\n${e.localizedMessage}")
-        return Mono.empty()
-    }
+                    .doOnError {
+                        logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
+                        logger.debug("Nested exception:", it)
+                    }
+                    .flatMap { it.receiveContent().toMono() }
+                    .map { it.content().toString(Charset.defaultCharset()) }
 }
index 580d36c..056e055 100644 (file)
@@ -31,10 +31,12 @@ import reactor.core.publisher.Flux
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+internal class WireChunkDecoder(private val decoder: WireFrameDecoder,
+                                alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
     private val streamBuffer = alloc.compositeBuffer()
 
-    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf)
+    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
+            .createFlux(decoder, streamBuffer, byteBuf)
             .doOnSubscribe { logIncomingMessage(byteBuf) }
             .doOnNext(this::logDecodedWireMessage)
 
index 67a7d6f..025c59f 100644 (file)
@@ -29,6 +29,7 @@ import java.time.Duration
 data class ServerConfiguration(
         val port: Int,
         val configurationUrl: String,
+        val configurationUpdateInterval: Duration,
         val securityConfiguration: SecurityConfiguration,
         val idleTimeout: Duration,
         val dummyMode: Boolean = false)
index b2da430..dd19084 100644 (file)
 package org.onap.dcae.collectors.veshv.impl.adapters
 
 import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.verify
 import com.nhaarman.mockito_kotlin.whenever
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.test.StepVerifier
+import java.time.Duration
 import java.util.*
 import kotlin.test.assertEquals
 
@@ -35,20 +39,63 @@ import kotlin.test.assertEquals
  */
 internal object ConsulConfigurationProviderTest : Spek({
 
+    val updateInterval = Duration.ofMillis(1)
+    val httpAdapterMock: HttpAdapter = mock()
+
     given("valid resource url") {
-        val testUrl = "http://valid-url/"
-        val httpAdapterMock: HttpAdapter = mock()
-        val consulConfigProvider = ConsulConfigurationProvider(testUrl, httpAdapterMock)
 
-        whenever(httpAdapterMock.getResponse(testUrl)).thenReturn(Mono.just(constructConsulResponse()))
+        val validUrl = "http://valid-url/"
+        val consulConfigProvider = ConsulConfigurationProvider(validUrl, updateInterval, httpAdapterMock)
+
+        whenever(httpAdapterMock.get(validUrl)).thenReturn(Mono.just(constructConsulResponse()))
+
+        it("should use default configuration at the beginning, " +
+                "then apply received configuration") {
+
+            StepVerifier.create(consulConfigProvider().take(2))
+                    .consumeNextWith {
+
+                        assertEquals("kafka:9092", it.kafkaBootstrapServers)
+
+                        val route1 = it.routing.routes[0]
+                        assertEquals(Domain.HVRANMEAS, route1.domain)
+                        assertEquals("ves_hvRanMeas", route1.targetTopic)
+                    }
+                    .consumeNextWith {
+
+                        assertEquals("kafka:9093", it.kafkaBootstrapServers)
+
+                        val route1 = it.routing.routes[0]
+                        assertEquals(Domain.HEARTBEAT, route1.domain)
+                        assertEquals("test-topic-1", route1.targetTopic)
+
+                        val route2 = it.routing.routes[1]
+                        assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route2.domain)
+                        assertEquals("test-topic-2", route2.targetTopic)
 
+                    }.verifyComplete()
+        }
+    }
+    given("invalid resource url") {
+
+        val invalidUrl = "http://invalid-url/"
+        val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, updateInterval, httpAdapterMock)
+
+        whenever(httpAdapterMock.get(invalidUrl)).thenReturn(Mono.error(RuntimeException("Test exception")))
+
+        it("should use default configuration at the beginning, then should interrupt the flux") {
 
-        it("should create valid collector configuration") {
-            val response = consulConfigProvider().blockFirst()
-            assertEquals("val1", response.kafkaBootstrapServers)
-            val route = response.routing.routes[0]
-            assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route.domain)
-            assertEquals("val3", route.targetTopic)
+            StepVerifier.create(consulConfigProvider())
+                    .consumeNextWith {
+
+
+                        assertEquals("kafka:9092", it.kafkaBootstrapServers)
+
+                        val route1 = it.routing.routes[0]
+                        assertEquals(Domain.HVRANMEAS, route1.domain)
+                        assertEquals("ves_hvRanMeas", route1.targetTopic)
+                    }
+                    .verifyErrorMessage("Test exception")
         }
     }
 })
@@ -56,11 +103,17 @@ internal object ConsulConfigurationProviderTest : Spek({
 fun constructConsulResponse(): String {
 
     val config = """{
-        "kafkaBootstrapServers": "val1",
-        "routing": {
-            "fromDomain": 2,
-            "toTopic": "val3"
-        }
+       "kafkaBootstrapServers": "kafka:9093",
+       "routing": [
+                   {
+                       "fromDomain": 1,
+                       "toTopic": "test-topic-1"
+                   },
+                   {
+                       "fromDomain": 2,
+                       "toTopic": "test-topic-2"
+               }
+    ]
     }"""
 
     val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
index 6628845..79eda99 100644 (file)
@@ -30,6 +30,7 @@ import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.ipc.netty.http.client.HttpClient
 import reactor.ipc.netty.http.client.HttpClientResponse
+import reactor.test.StepVerifier
 import java.nio.charset.Charset
 import kotlin.test.assertEquals
 
@@ -43,13 +44,15 @@ internal object HttpAdapterTest : Spek({
 
         val httpClientMock: HttpClient = mock()
         val httpAdapter = HttpAdapter(httpClientMock)
-        val testUrl = "http://valid-url/"
+        val validUrl = "http://valid-url/"
         val responseContent = """{"key1": "value1", "key2": "value2"}"""
         val httpResponse = createHttpResponseMock(responseContent)
-        whenever(httpClientMock.get(testUrl)).thenReturn(Mono.just(httpResponse))
+        whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse))
 
         it("should return response string") {
-            assertEquals(responseContent, httpAdapter.getResponse(testUrl).block())
+            StepVerifier
+                    .create(httpAdapter.get(validUrl))
+                    .expectNext(responseContent)
         }
     }
 
@@ -57,12 +60,14 @@ internal object HttpAdapterTest : Spek({
 
         val httpClientMock: HttpClient = mock()
         val httpAdapter = HttpAdapter(httpClientMock)
-        val testUrl = "http://invalid-url/"
-        whenever(httpClientMock.get(testUrl)).thenReturn(Mono.error(Exception("Test exception")))
+        val invalidUrl = "http://invalid-url/"
+        val exceptionMessage = "Test exception"
+        whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage)))
 
-
-        it("should return null response") {
-            assertEquals(null, httpAdapter.getResponse(testUrl).block())
+        it("should interrupt the flux") {
+            StepVerifier
+                    .create(httpAdapter.get(invalidUrl))
+                    .verifyErrorMessage(exceptionMessage)
         }
     }
 })
index 1367ff1..cab61dc 100644 (file)
@@ -9,7 +9,7 @@ EXPOSE 6061
 
 WORKDIR /opt/ves-hv-collector
 ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
-CMD ["--listen-port", "6061"]
+CMD ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"]
 COPY target/libs/external/* ./
 COPY target/libs/internal/* ./
 COPY target/hv-collector-main-*.jar ./
index f3e97be..63de270 100644 (file)
@@ -24,17 +24,21 @@ import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.UPDATE_INTERVAL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
+
 import java.time.Duration
 
 internal object DefaultValues {
     const val PORT = 6061
+    const val UPDATE_INTERVAL = 300L
     const val CONFIG_URL = ""
     const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
     const val CERT_FILE = "/etc/ves-hv/server.crt"
@@ -45,6 +49,7 @@ internal object DefaultValues {
 internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
     override val cmdLineOptionsList = listOf(
             LISTEN_PORT,
+            UPDATE_INTERVAL,
             CONSUL_CONFIG_URL,
             PRIVATE_KEY_FILE,
             CERT_FILE,
@@ -56,12 +61,14 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
     override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
         val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
         val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
+        val updateInterval = cmdLine.longValue(UPDATE_INTERVAL, DefaultValues.UPDATE_INTERVAL)
         val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
         val dummyMode = cmdLine.hasOption(DUMMY_MODE)
         val security = createSecurityConfiguration(cmdLine)
         return ServerConfiguration(
                 port = port,
                 configurationUrl = configUrl,
+                configurationUpdateInterval = Duration.ofSeconds(updateInterval),
                 securityConfiguration = security,
                 idleTimeout = Duration.ofSeconds(idleTimeoutSec),
                 dummyMode = dummyMode)
index 074a75e..d1c3b4a 100644 (file)
 package org.onap.dcae.collectors.veshv.main
 
 import arrow.core.flatMap
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
 import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 
 private val logger = Logger("org.onap.dcae.collectors.veshv.main")
 private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
@@ -55,7 +51,8 @@ fun main(args: Array<String>) {
 private fun createServer(config: ServerConfiguration): Server {
     val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
     val collectorProvider = CollectorFactory(
-            resolveConfigurationProvider(config),
+            AdapterFactory.consulConfigurationProvider(
+                    config.configurationUrl, config.configurationUpdateInterval),
             sink,
             MicrometerMetrics()
     ).createVesHvCollectorProvider()
@@ -63,23 +60,3 @@ private fun createServer(config: ServerConfiguration): Server {
     return ServerFactory.createNettyTcpServer(config, collectorProvider)
 }
 
-private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider {
-
-    if (serverConfiguration.configurationUrl.isEmpty()) {
-        logger.info("Configuration url not specified - using default config")
-        val sampleConfig = CollectorConfiguration(
-                kafkaBootstrapServers = "kafka:9092",
-                routing = routing {
-                    defineRoute {
-                        fromDomain(Domain.HVRANMEAS)
-                        toTopic("ves_hvRanMeas")
-                        withFixedPartitioning()
-                    }
-                }.build()
-        )
-        return AdapterFactory.staticConfigurationProvider(sampleConfig)
-    }
-
-    logger.info("Using configuration url: ${serverConfiguration.configurationUrl}")
-    return AdapterFactory.consulConfigurationProvider(serverConfiguration.configurationUrl)
-}
index 4c2425b..a14801d 100644 (file)
@@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import java.nio.file.Paths
+import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,6 +39,8 @@ import java.nio.file.Paths
 object ArgBasedServerConfigurationTest : Spek({
     lateinit var cut: ArgBasedServerConfiguration
     val configurationUrl = "http://test-address/test"
+    val listenPort = "6969"
+    val updateInterval = "10"
     val pk = Paths.get("/", "etc", "ves", "pk.pem")
     val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt")
     val trustCert = Paths.get("/", "etc", "ves", "trusted.crt")
@@ -59,8 +62,9 @@ object ArgBasedServerConfigurationTest : Spek({
             lateinit var result: ServerConfiguration
 
             beforeEachTest {
-                result = parse("--listen-port", "6969",
+                result = parse("--listen-port", listenPort,
                         "--config-url", configurationUrl,
+                        "--update-interval", updateInterval,
                         "--private-key-file", pk.toFile().absolutePath,
                         "--cert-file", cert.toFile().absolutePath,
                         "--trust-cert-file", trustCert.toFile().absolutePath)
@@ -70,6 +74,10 @@ object ArgBasedServerConfigurationTest : Spek({
                 assertThat(result.port).isEqualTo(6969)
             }
 
+            it("should set update interval") {
+                assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(10))
+            }
+
             it("should set proper config url") {
                 assertThat(result.configurationUrl).isEqualTo(configurationUrl)
             }
@@ -112,6 +120,10 @@ object ArgBasedServerConfigurationTest : Spek({
                 assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL)
             }
 
+            it("should set default update interval") {
+                assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(DefaultValues.UPDATE_INTERVAL))
+            }
+
             on("security config") {
                 val securityConfiguration = result.securityConfiguration
 
index 942ca31..b20f2aa 100644 (file)
@@ -35,6 +35,12 @@ enum class CommandLineOption(val option: Option) {
             .desc("URL of ves configuration on consul")
             .build()
     ),
+    UPDATE_INTERVAL(Option.builder("I")
+            .longOpt("update-interval")
+            .hasArg()
+            .desc("Consul configuration update interval in seconds")
+            .build()
+    ),
     VES_HV_PORT(Option.builder("p")
             .longOpt("ves-port")
             .required()