Fix consul request timeout issue 45/58845/2
authorJakub Dudycz <jakub.dudycz@nokia.com>
Fri, 20 Jul 2018 14:37:02 +0000 (16:37 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 3 Aug 2018 06:31:09 +0000 (08:31 +0200)
Fix timeout issue when using consul blocking query calls
by switching to standard requests peformed in given interval

Closes ONAP-628

Change-Id: Ifaf7ddfa27045015a7a90c178e0d6d38955c0c58
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
pom.xml

index 2a8a396..11a0e9b 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import reactor.ipc.netty.http.client.HttpClient
 import java.time.Duration
 
@@ -33,8 +34,13 @@ object AdapterFactory {
     fun kafkaSink(): SinkProvider = KafkaSinkProvider()
     fun loggingSink(): SinkProvider = LoggingSinkProvider()
 
-    fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider =
-            ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay)
+    fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+            ConsulConfigurationProvider(
+                    configurationProviderParams.configurationUrl,
+                    httpAdapter(),
+                    configurationProviderParams.firstRequestDelay,
+                    configurationProviderParams.requestInterval
+            )
 
     fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
 }
index 621c63f..aca0e7e 100644 (file)
@@ -41,10 +41,10 @@ import javax.json.JsonObject
  */
 internal class ConsulConfigurationProvider(private val url: String,
                                            private val http: HttpAdapter,
-                                           private val firstRequestDelay: Duration
+                                           private val firstRequestDelay: Duration,
+                                           private val requestInterval: Duration
 ) : ConfigurationProvider {
 
-    private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
 
     override fun invoke(): Flux<CollectorConfiguration> =
@@ -62,27 +62,22 @@ internal class ConsulConfigurationProvider(private val url: String,
                     }.build())
     ).doOnNext { logger.info("Applied default configuration") }
 
-    private fun createConsulFlux(): Flux<CollectorConfiguration> =
-            http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
-                    .doOnError {
-                        logger.error("Encountered an error " +
-                                "when trying to acquire configuration from consul. Shutting down..")
-                    }
-                    .map(::parseJsonResponse)
-                    .doOnNext(::updateModifyIndex)
-                    .map(::extractEncodedConfiguration)
-                    .flatMap(::filterDifferentValues)
-                    .map(::decodeConfiguration)
-                    .map(::createCollectorConfiguration)
-                    .repeat()
-                    .delaySubscription(firstRequestDelay)
+    private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
+            .interval(firstRequestDelay, requestInterval)
+            .flatMap { http.get(url) }
+            .doOnError {
+                logger.error("Encountered an error " +
+                        "when trying to acquire configuration from consul. Shutting down..")
+            }
+            .map(::parseJsonResponse)
+            .map(::extractEncodedConfiguration)
+            .flatMap(::filterDifferentValues)
+            .map(::decodeConfiguration)
+            .map(::createCollectorConfiguration)
 
     private fun parseJsonResponse(responseString: String): JsonObject =
             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
 
-    private fun updateModifyIndex(response: JsonObject) =
-            lastModifyIndex.set(response.getInt("ModifyIndex"))
-
     private fun extractEncodedConfiguration(response: JsonObject): String =
             response.getString("Value")
 
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
new file mode 100644 (file)
index 0000000..9de3449
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+data class ConfigurationProviderParams(val configurationUrl: String,
+                                       val firstRequestDelay: Duration,
+                                       val requestInterval: Duration)
index a486996..93ad719 100644 (file)
@@ -28,8 +28,7 @@ import java.time.Duration
  */
 data class ServerConfiguration(
         val port: Int,
-        val configurationUrl: String,
-        val firstRequestDelay: Duration,
+        val configurationProviderParams: ConfigurationProviderParams,
         val securityConfiguration: SecurityConfiguration,
         val idleTimeout: Duration,
         val dummyMode: Boolean = false)
index 322ec4e..808a6fc 100644 (file)
@@ -41,11 +41,12 @@ internal object ConsulConfigurationProviderTest : Spek({
 
     val httpAdapterMock: HttpAdapter = mock()
     val firstRequestDelay = Duration.ofMillis(1)
+    val requestInterval = Duration.ofMillis(1)
 
     given("valid resource url") {
 
         val validUrl = "http://valid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay)
+        val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval)
 
         whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
                 .thenReturn(Mono.just(constructConsulResponse()))
@@ -80,7 +81,7 @@ internal object ConsulConfigurationProviderTest : Spek({
     given("invalid resource url") {
 
         val invalidUrl = "http://invalid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay)
+        val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval)
 
         whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
                 .thenReturn(Mono.error(RuntimeException("Test exception")))
index 35ca09d..a11fe3d 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
@@ -30,6 +31,7 @@ import java.time.Duration
 internal object DefaultValues {
     const val PORT = 6061
     const val CONSUL_FIRST_REQUEST_DELAY = 10L
+    const val CONSUL_REQUEST_INTERVAL = 5L
     const val CONFIG_URL = ""
     const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
     const val CERT_FILE = "/etc/ves-hv/server.crt"
@@ -42,6 +44,7 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
             LISTEN_PORT,
             CONSUL_CONFIG_URL,
             CONSUL_FIRST_REQUEST_DELAY,
+            CONSUL_REQUEST_INTERVAL,
             SSL_DISABLE,
             PRIVATE_KEY_FILE,
             CERT_FILE,
@@ -52,20 +55,30 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
 
     override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
         val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
-        val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
-        val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
         val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
         val dummyMode = cmdLine.hasOption(DUMMY_MODE)
         val security = createSecurityConfiguration(cmdLine)
+        val configurationProviderParams = createConfigurationProviderParams(cmdLine);
         return ServerConfiguration(
                 port = port,
-                configurationUrl = configUrl,
-                firstRequestDelay = Duration.ofSeconds(firstRequestDelay),
+                configurationProviderParams = configurationProviderParams,
                 securityConfiguration = security,
                 idleTimeout = Duration.ofSeconds(idleTimeoutSec),
                 dummyMode = dummyMode)
     }
 
+    private fun createConfigurationProviderParams(cmdLine: CommandLine): ConfigurationProviderParams {
+        val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
+        val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
+        val requestInterval = cmdLine.longValue(CONSUL_REQUEST_INTERVAL, DefaultValues.CONSUL_REQUEST_INTERVAL)
+
+        return ConfigurationProviderParams(
+                configUrl,
+                Duration.ofSeconds(firstRequestDelay),
+                Duration.ofSeconds(requestInterval)
+        )
+    }
+
     private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
         val sslDisable = cmdLine.hasOption(SSL_DISABLE)
         val pkFile = cmdLine.stringValue(PRIVATE_KEY_FILE, DefaultValues.PRIVATE_KEY_FILE)
index 8418cd7..aa1f67b 100644 (file)
@@ -54,7 +54,9 @@ fun main(args: Array<String>) =
 private fun createServer(config: ServerConfiguration): Server {
     val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
     val collectorProvider = CollectorFactory(
-            AdapterFactory.consulConfigurationProvider(config.configurationUrl, config.firstRequestDelay), sink, MicrometerMetrics()
+            AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+            sink,
+            MicrometerMetrics()
     ).createVesHvCollectorProvider()
 
     return ServerFactory.createNettyTcpServer(config, collectorProvider)
index 0498344..6da605f 100644 (file)
@@ -39,6 +39,7 @@ object ArgBasedServerConfigurationTest : Spek({
     lateinit var cut: ArgBasedServerConfiguration
     val configurationUrl = "http://test-address/test"
     val firstRequestDelay = "10"
+    val requestInterval = "5"
     val listenPort = "6969"
     val pk = Paths.get("/", "etc", "ves", "pk.pem")
     val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt")
@@ -63,6 +64,7 @@ object ArgBasedServerConfigurationTest : Spek({
                         "--listen-port", listenPort,
                         "--config-url", configurationUrl,
                         "--first-request-delay", firstRequestDelay,
+                        "--request-interval", requestInterval,
                         "--private-key-file", pk.toFile().absolutePath,
                         "--cert-file", cert.toFile().absolutePath,
                         "--trust-cert-file", trustCert.toFile().absolutePath)
@@ -73,11 +75,18 @@ object ArgBasedServerConfigurationTest : Spek({
             }
 
             it("should set proper first consul request delay") {
-                assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
+                assertThat(result.configurationProviderParams.firstRequestDelay)
+                        .isEqualTo(Duration.ofSeconds(10))
+            }
+
+            it("should set proper consul request interval") {
+                assertThat(result.configurationProviderParams.requestInterval)
+                        .isEqualTo(Duration.ofSeconds(5))
             }
 
             it("should set proper config url") {
-                assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+                assertThat(result.configurationProviderParams.configurationUrl)
+                        .isEqualTo(configurationUrl)
             }
 
             it("should set proper security configuration") {
@@ -85,8 +94,6 @@ object ArgBasedServerConfigurationTest : Spek({
                         SecurityConfiguration(sslDisable = true, privateKey = pk, cert = cert, trustedCert = trustCert)
                 )
             }
-
-
         }
 
         given("some parameters are present in the short form") {
@@ -101,11 +108,13 @@ object ArgBasedServerConfigurationTest : Spek({
             }
 
             it("should set proper first consul request delay") {
-                assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
+                assertThat(result.configurationProviderParams.firstRequestDelay)
+                        .isEqualTo(Duration.ofSeconds(10))
             }
 
             it("should set proper config url") {
-                assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+                assertThat(result.configurationProviderParams.configurationUrl)
+                        .isEqualTo(configurationUrl)
             }
         }
 
@@ -121,14 +130,20 @@ object ArgBasedServerConfigurationTest : Spek({
             }
 
             it("should set default config url") {
-                assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL)
+                assertThat(result.configurationProviderParams.configurationUrl)
+                        .isEqualTo(DefaultValues.CONFIG_URL)
             }
 
             it("should set default first consul request delay") {
-                assertThat(result.firstRequestDelay)
+                assertThat(result.configurationProviderParams.firstRequestDelay)
                         .isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_FIRST_REQUEST_DELAY))
             }
 
+            it("should set default consul request interval") {
+                assertThat(result.configurationProviderParams.requestInterval)
+                        .isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_REQUEST_INTERVAL))
+            }
+
             on("security config") {
                 val securityConfiguration = result.securityConfiguration
 
index 27213c9..a654868 100644 (file)
@@ -41,6 +41,12 @@ enum class CommandLineOption(val option: Option) {
             .desc("Delay of first request to consul in seconds")
             .build()
     ),
+    CONSUL_REQUEST_INTERVAL(Option.builder("I")
+            .longOpt("request-interval")
+            .hasArg()
+            .desc("Interval of consul configuration requests in seconds")
+            .build()
+    ),
     VES_HV_PORT(Option.builder("p")
             .longOpt("ves-port")
             .required()
@@ -105,7 +111,7 @@ enum class CommandLineOption(val option: Option) {
                 |connection might be closed.""".trimMargin())
             .build()
     ),
-    DUMMY_MODE(Option.builder("d")
+    DUMMY_MODE(Option.builder("u")
             .longOpt("dummy")
             .desc("If present will start in dummy mode (dummy external services)")
             .build()
diff --git a/pom.xml b/pom.xml
index 45fa496..ef531bf 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -71,8 +71,8 @@
         <failIfMissingComponentTests>false</failIfMissingComponentTests>
         <skipAnalysis>true</skipAnalysis>
 
-        <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
         <!-- Docker -->
+        <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
         <onap.nexus.dockerregistry.daily>nexus3.onap.org:10003</onap.nexus.dockerregistry.daily>
         <onap.nexus.dockerregistry.release>nexus3.onap.org:10002</onap.nexus.dockerregistry.release>
         <docker-image.registry>${onap.nexus.dockerregistry.daily}</docker-image.registry>