Implement blocking consul calls 63/58663/1
authorJakub Dudycz <jdudycz@nokia.com>
Thu, 5 Jul 2018 12:35:43 +0000 (14:35 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 10:24:04 +0000 (12:24 +0200)
Replaced interval based requesting for consul configuration with blocking query calls

Closes ONAP-80

Change-Id: If70365bae9fde513d99b047209d085122a5df0dd
Signed-off-by: Jakub Dudycz <jdudycz@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt

index d9e7432..2a8a396 100644 (file)
@@ -33,8 +33,8 @@ object AdapterFactory {
     fun kafkaSink(): SinkProvider = KafkaSinkProvider()
     fun loggingSink(): SinkProvider = LoggingSinkProvider()
 
-    fun consulConfigurationProvider(url: String, updateInterval: Duration): ConfigurationProvider =
-            ConsulConfigurationProvider(url, updateInterval, httpAdapter())
+    fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider =
+            ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay)
 
     fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
 }
index c70d128..727f025 100644 (file)
@@ -26,9 +26,14 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
 import org.slf4j.LoggerFactory
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.client.HttpClientException
+import reactor.retry.Retry
+import reactor.retry.retryExponentialBackoff
 import java.io.StringReader
 import java.time.Duration
-import java.util.Base64
+import java.util.*
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 import javax.json.Json
 import javax.json.JsonObject
@@ -39,18 +44,17 @@ import javax.json.JsonObject
  * @since May 2018
  */
 internal class ConsulConfigurationProvider(private val url: String,
-                                           private val updateInterval: Duration,
-                                           private val http: HttpAdapter
+                                           private val http: HttpAdapter,
+                                           private val firstRequestDelay: Duration
 ) : ConfigurationProvider {
 
-
-    private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
-    private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
+    private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
+    private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
 
     override fun invoke(): Flux<CollectorConfiguration> =
             Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
 
-    private fun createDefaultConfigurationFlux(): Flux<CollectorConfiguration> = Flux.just(
+    private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
             CollectorConfiguration(
                     kafkaBootstrapServers = "kafka:9092",
                     routing = routing {
@@ -60,22 +64,45 @@ internal class ConsulConfigurationProvider(private val url: String,
                             withFixedPartitioning()
                         }
                     }.build())
-    ).doOnNext { logger.info("Applied default configuration") }
-
-    private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux.interval(updateInterval)
-            .flatMap { http.get(url) }
-            .doOnError { logger.error("Encountered an error when trying to acquire configuration from consul. " +
-                    "Shutting down..") }
-            .filter { it.hashCode() != lastConfigurationHash.get() }
-            .doOnNext { lastConfigurationHash.set(it.hashCode()) }
-            .map { getConfigurationJson(it) }
-            .map { createCollectorConfiguration(it) }
-
-
-    private fun getConfigurationJson(str: String): JsonObject {
-        val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
-        val decodedValue = String(
-                Base64.getDecoder().decode(response.getString("Value")))
+    ).doOnNext { logger.info("Applied default configuration") }.delayElement(firstRequestDelay)
+
+    private fun createConsulFlux(): Flux<CollectorConfiguration> =
+            http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
+                    .doOnError {
+                        logger.error("Encountered an error " +
+                                "when trying to acquire configuration from consul. Shutting down..")
+                    }
+                    .map(::parseJsonResponse)
+                    .doOnNext(::updateModifyIndex)
+                    .map(::extractEncodedConfiguration)
+                    .flatMap(::filterDifferentValues)
+                    .map(::decodeConfiguration)
+                    .map(::createCollectorConfiguration)
+                    .repeat()
+
+    private fun parseJsonResponse(responseString: String): JsonObject =
+            Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
+
+    private fun updateModifyIndex(response: JsonObject) =
+            lastModifyIndex.set(response.getInt("ModifyIndex"))
+
+    private fun extractEncodedConfiguration(response: JsonObject): String =
+            response.getString("Value")
+
+    private fun filterDifferentValues(base64Value: String): Mono<String> {
+        val newHash = hashOf(base64Value)
+        return if (newHash == lastConfigurationHash.get()) {
+            Mono.empty()
+        } else {
+            lastConfigurationHash.set(newHash)
+            Mono.just(base64Value)
+        }
+    }
+
+    private fun hashOf(str: String) = str.hashCode()
+
+    private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
+        val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
         logger.info("Obtained new configuration from consul:\n$decodedValue")
         return Json.createReader(StringReader(decodedValue)).readObject()
     }
@@ -97,5 +124,9 @@ internal class ConsulConfigurationProvider(private val url: String,
                 }.build()
         )
     }
+
+    companion object {
+        private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
+    }
 }
 
index a41cd09..4503955 100644 (file)
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory
 import reactor.core.publisher.Mono
 import reactor.core.publisher.toMono
 import reactor.ipc.netty.http.client.HttpClient
-import reactor.ipc.netty.http.client.HttpClientResponse
 import java.nio.charset.Charset
 
 /**
@@ -34,12 +33,31 @@ open class HttpAdapter(private val httpClient: HttpClient) {
 
     private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
 
-    open fun get(url: String): Mono<String> =
-            httpClient.get(url)
-                    .doOnError {
-                        logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
-                        logger.debug("Nested exception:", it)
-                    }
-                    .flatMap { it.receiveContent().toMono() }
-                    .map { it.content().toString(Charset.defaultCharset()) }
+    open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
+            .get(url + createQueryString(queryParams))
+            .doOnError {
+                logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
+                logger.debug("Nested exception:", it)
+            }
+            .flatMap { it.receiveContent().toMono() }
+            .map { it.content().toString(Charset.defaultCharset()) }
+
+
+    private fun createQueryString(params: Map<String, Any>): String {
+        if (params.isEmpty())
+            return ""
+
+        val builder = StringBuilder("?")
+        params.forEach { (key, value) ->
+            builder
+                    .append(key)
+                    .append("=")
+                    .append(value)
+                    .append("&")
+
+        }
+
+        return  builder.removeSuffix("&").toString()
+    }
+
 }
index 025c59f..a486996 100644 (file)
@@ -29,7 +29,7 @@ import java.time.Duration
 data class ServerConfiguration(
         val port: Int,
         val configurationUrl: String,
-        val configurationUpdateInterval: Duration,
+        val firstRequestDelay: Duration,
         val securityConfiguration: SecurityConfiguration,
         val idleTimeout: Duration,
         val dummyMode: Boolean = false)
index dd19084..c98c97a 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
+import com.nhaarman.mockito_kotlin.eq
 import com.nhaarman.mockito_kotlin.mock
 import com.nhaarman.mockito_kotlin.verify
 import com.nhaarman.mockito_kotlin.whenever
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.mockito.Mockito
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import reactor.core.publisher.Mono
 import reactor.ipc.netty.http.client.HttpClient
@@ -39,15 +41,16 @@ import kotlin.test.assertEquals
  */
 internal object ConsulConfigurationProviderTest : Spek({
 
-    val updateInterval = Duration.ofMillis(1)
     val httpAdapterMock: HttpAdapter = mock()
+    val firstRequestDelay = Duration.ofMillis(1)
 
     given("valid resource url") {
 
         val validUrl = "http://valid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(validUrl, updateInterval, httpAdapterMock)
+        val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay)
 
-        whenever(httpAdapterMock.get(validUrl)).thenReturn(Mono.just(constructConsulResponse()))
+        whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+                .thenReturn(Mono.just(constructConsulResponse()))
 
         it("should use default configuration at the beginning, " +
                 "then apply received configuration") {
@@ -79,9 +82,10 @@ internal object ConsulConfigurationProviderTest : Spek({
     given("invalid resource url") {
 
         val invalidUrl = "http://invalid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, updateInterval, httpAdapterMock)
+        val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay)
 
-        whenever(httpAdapterMock.get(invalidUrl)).thenReturn(Mono.error(RuntimeException("Test exception")))
+        whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+                .thenReturn(Mono.error(RuntimeException("Test exception")))
 
         it("should use default configuration at the beginning, then should interrupt the flux") {
 
index 79eda99..123d8f7 100644 (file)
 package org.onap.dcae.collectors.veshv.impl.adapters
 
 import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.verify
 import com.nhaarman.mockito_kotlin.whenever
 import io.netty.buffer.Unpooled
 import io.netty.handler.codec.http.HttpContent
 import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import reactor.core.publisher.Flux
@@ -32,44 +34,65 @@ import reactor.ipc.netty.http.client.HttpClient
 import reactor.ipc.netty.http.client.HttpClientResponse
 import reactor.test.StepVerifier
 import java.nio.charset.Charset
-import kotlin.test.assertEquals
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
 internal object HttpAdapterTest : Spek({
-
-    given("valid resource url") {
+    describe("HttpAdapter") {
 
         val httpClientMock: HttpClient = mock()
         val httpAdapter = HttpAdapter(httpClientMock)
-        val validUrl = "http://valid-url/"
-        val responseContent = """{"key1": "value1", "key2": "value2"}"""
-        val httpResponse = createHttpResponseMock(responseContent)
-        whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse))
 
-        it("should return response string") {
-            StepVerifier
-                    .create(httpAdapter.get(validUrl))
-                    .expectNext(responseContent)
+        given("url without query params") {
+            val initialUrl = "http://test-url"
+            whenever(httpClientMock.get(initialUrl)).thenReturn(Mono.empty())
+
+            it("should not append query string") {
+                httpAdapter.get(initialUrl)
+                verify(httpClientMock).get(initialUrl)
+            }
         }
-    }
 
-    given("invalid resource url") {
+        given("url with query params") {
+            val queryParams = mapOf(Pair("key", "value"))
+            val initialUrl = "http://test-url"
+            val expectedUrl = "http://test-url?key=value"
+            whenever(httpClientMock.get(expectedUrl)).thenReturn(Mono.empty())
 
-        val httpClientMock: HttpClient = mock()
-        val httpAdapter = HttpAdapter(httpClientMock)
-        val invalidUrl = "http://invalid-url/"
-        val exceptionMessage = "Test exception"
-        whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage)))
+            it("should parse them to query string and append to url") {
+                httpAdapter.get(initialUrl, queryParams)
+                verify(httpClientMock).get(expectedUrl)
+            }
+        }
 
-        it("should interrupt the flux") {
-            StepVerifier
-                    .create(httpAdapter.get(invalidUrl))
-                    .verifyErrorMessage(exceptionMessage)
+        given("valid resource url") {
+            val validUrl = "http://valid-url/"
+            val responseContent = """{"key1": "value1", "key2": "value2"}"""
+            val httpResponse = createHttpResponseMock(responseContent)
+            whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse))
+
+            it("should return response string") {
+                StepVerifier
+                        .create(httpAdapter.get(validUrl))
+                        .expectNext(responseContent)
+            }
+        }
+
+        given("invalid resource url") {
+            val invalidUrl = "http://invalid-url/"
+            val exceptionMessage = "Test exception"
+            whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage)))
+
+            it("should interrupt the flux") {
+                StepVerifier
+                        .create(httpAdapter.get(invalidUrl))
+                        .verifyErrorMessage(exceptionMessage)
+            }
         }
     }
+
 })
 
 fun createHttpResponseMock(content: String): HttpClientResponse {
index 63de270..9e4c5f2 100644 (file)
@@ -24,21 +24,12 @@ import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.UPDATE_INTERVAL
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
-
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
 import java.time.Duration
 
 internal object DefaultValues {
     const val PORT = 6061
-    const val UPDATE_INTERVAL = 300L
+    const val CONSUL_FIRST_REQUEST_DELAY = 10L
     const val CONFIG_URL = ""
     const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
     const val CERT_FILE = "/etc/ves-hv/server.crt"
@@ -49,8 +40,8 @@ internal object DefaultValues {
 internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
     override val cmdLineOptionsList = listOf(
             LISTEN_PORT,
-            UPDATE_INTERVAL,
             CONSUL_CONFIG_URL,
+            CONSUL_FIRST_REQUEST_DELAY,
             PRIVATE_KEY_FILE,
             CERT_FILE,
             TRUST_CERT_FILE,
@@ -61,14 +52,14 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
     override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
         val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
         val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
-        val updateInterval = cmdLine.longValue(UPDATE_INTERVAL, DefaultValues.UPDATE_INTERVAL)
+        val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
         val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
         val dummyMode = cmdLine.hasOption(DUMMY_MODE)
         val security = createSecurityConfiguration(cmdLine)
         return ServerConfiguration(
                 port = port,
                 configurationUrl = configUrl,
-                configurationUpdateInterval = Duration.ofSeconds(updateInterval),
+                firstRequestDelay = Duration.ofSeconds(firstRequestDelay),
                 securityConfiguration = security,
                 idleTimeout = Duration.ofSeconds(idleTimeoutSec),
                 dummyMode = dummyMode)
index f5efab2..1661aea 100644 (file)
@@ -55,10 +55,7 @@ fun main(args: Array<String>) =
 private fun createServer(config: ServerConfiguration): Server {
     val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
     val collectorProvider = CollectorFactory(
-            AdapterFactory.consulConfigurationProvider(
-                    config.configurationUrl, config.configurationUpdateInterval),
-            sink,
-            MicrometerMetrics()
+            AdapterFactory.consulConfigurationProvider(config.configurationUrl, config.firstRequestDelay), sink, MicrometerMetrics()
     ).createVesHvCollectorProvider()
 
     return ServerFactory.createNettyTcpServer(config, collectorProvider)
index 2c49cf9..8f0c2af 100644 (file)
@@ -40,8 +40,8 @@ import java.time.Duration
 object ArgBasedServerConfigurationTest : Spek({
     lateinit var cut: ArgBasedServerConfiguration
     val configurationUrl = "http://test-address/test"
+    val firstRequestDelay = "10"
     val listenPort = "6969"
-    val updateInterval = "10"
     val pk = Paths.get("/", "etc", "ves", "pk.pem")
     val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt")
     val trustCert = Paths.get("/", "etc", "ves", "trusted.crt")
@@ -63,7 +63,7 @@ object ArgBasedServerConfigurationTest : Spek({
             beforeEachTest {
                 result = parse("--listen-port", listenPort,
                         "--config-url", configurationUrl,
-                        "--update-interval", updateInterval,
+                        "--first-request-delay", firstRequestDelay,
                         "--private-key-file", pk.toFile().absolutePath,
                         "--cert-file", cert.toFile().absolutePath,
                         "--trust-cert-file", trustCert.toFile().absolutePath)
@@ -73,8 +73,8 @@ object ArgBasedServerConfigurationTest : Spek({
                 assertThat(result.port).isEqualTo(6969)
             }
 
-            it("should set update interval") {
-                assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(10))
+            it("should set proper first consul request delay") {
+                assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
             }
 
             it("should set proper config url") {
@@ -86,19 +86,25 @@ object ArgBasedServerConfigurationTest : Spek({
                         SecurityConfiguration(pk, cert, trustCert)
                 )
             }
+
+
         }
 
         given("some parameters are present in the short form") {
             lateinit var result: ServerConfiguration
 
             beforeEachTest {
-                result = parse("-p", "666", "-c", configurationUrl)
+                result = parse("-p", "666", "-c", configurationUrl, "-d", firstRequestDelay)
             }
 
             it("should set proper port") {
                 assertThat(result.port).isEqualTo(666)
             }
 
+            it("should set proper first consul request delay") {
+                assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
+            }
+
             it("should set proper config url") {
                 assertThat(result.configurationUrl).isEqualTo(configurationUrl)
             }
@@ -119,8 +125,9 @@ object ArgBasedServerConfigurationTest : Spek({
                 assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL)
             }
 
-            it("should set default update interval") {
-                assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(DefaultValues.UPDATE_INTERVAL))
+            it("should set default first consul request delay") {
+                assertThat(result.firstRequestDelay)
+                        .isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_FIRST_REQUEST_DELAY))
             }
 
             on("security config") {
index b20f2aa..82711d9 100644 (file)
@@ -35,10 +35,10 @@ enum class CommandLineOption(val option: Option) {
             .desc("URL of ves configuration on consul")
             .build()
     ),
-    UPDATE_INTERVAL(Option.builder("I")
-            .longOpt("update-interval")
+    CONSUL_FIRST_REQUEST_DELAY(Option.builder("d")
+            .longOpt("first-request-delay")
             .hasArg()
-            .desc("Consul configuration update interval in seconds")
+            .desc("Delay of first request to consul in seconds")
             .build()
     ),
     VES_HV_PORT(Option.builder("p")