Parse whole dynamic configuration 07/84107/10
authorJakub Dudycz <jakub.dudycz@nokia.com>
Mon, 8 Apr 2019 11:48:42 +0000 (13:48 +0200)
committerJakub Dudycz <jakub.dudycz@nokia.com>
Tue, 9 Apr 2019 13:50:41 +0000 (15:50 +0200)
Change-Id: I96e4cf3ac75920ed909da9063ba0b788b55474e4
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1386

14 files changed:
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParser.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt with 72% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt [moved from sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt with 83% similarity]
sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityKeysPaths.kt [deleted file]
sources/hv-collector-utils/pom.xml
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/json.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt with 66% similarity]

index ccce62a..9338157 100644 (file)
@@ -26,8 +26,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
 import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
-import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
 import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
 import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono
 class ConfigurationModule {
 
     private val cmd = HvVesCommandLineParser()
-    private val configReader = FileConfigurationReader()
+    private val configParser = JsonConfigurationParser()
     private val configValidator = ConfigurationValidator()
     private val merger = ConfigurationMerger()
 
@@ -51,10 +51,9 @@ class ConfigurationModule {
                                   mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
             Mono.just(cmd.getConfigurationFile(args))
                     .throwOnLeft(::MissingArgumentException)
-                    .map {
-                        logger.info { "Using base configuration file: ${it.absolutePath}" }
-                        it.reader().use(configReader::loadConfig)
-                    }
+                    .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } }
+                    .map { it.reader().use(configParser::parse) }
+                    .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } }
                     .cache()
                     .flatMapMany { basePartialConfig ->
                         cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
@@ -70,12 +69,13 @@ class ConfigurationModule {
             CbsConfigurationProvider(
                     CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
                     cbsConfigurationFrom(basePartialConfig),
+                    configParser,
                     configStateListener,
                     mdc)
 
-    private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
-            configValidator.validatedCbsConfiguration(basePartialConfig)
-                    .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+    private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator
+            .validatedCbsConfiguration(basePartialConfig)
+            .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
 
     companion object {
         private val logger = Logger(ConfigurationModule::class)
index c1807be..f745d59 100644 (file)
@@ -37,8 +37,8 @@ data class HvVesConfiguration(
 
 data class ServerConfiguration(
         val listenPort: Int,
-        val idleTimeout: Duration,
-        val maxPayloadSizeBytes: Int
+        val maxPayloadSizeBytes: Int,
+        val idleTimeout: Duration
 )
 
 data class CbsConfiguration(
index b646293..4982c73 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.config.impl
 
-import arrow.core.None
-import arrow.core.Option
-import arrow.core.Some
+import arrow.core.toOption
 import com.google.gson.JsonObject
 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcae.collectors.veshv.utils.reader
 import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
@@ -50,26 +47,29 @@ import reactor.retry.Retry
  */
 internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
                                         private val cbsConfiguration: CbsConfiguration,
+                                        private val configParser: JsonConfigurationParser,
                                         private val streamParser: StreamFromGsonParser<KafkaSink>,
                                         private val configurationStateListener: ConfigurationStateListener,
-                                        retrySpec: Retry<Any>,
-                                        private val mdc: MappedDiagnosticContext
+                                        private val mdc: MappedDiagnosticContext,
+                                        retrySpec: Retry<Any>
 
 ) {
     constructor(cbsClientMono: Mono<CbsClient>,
                 cbsConfig: CbsConfiguration,
+                configParser: JsonConfigurationParser,
                 configurationStateListener: ConfigurationStateListener,
                 mdc: MappedDiagnosticContext) :
             this(
                     cbsClientMono,
                     cbsConfig,
+                    configParser,
                     StreamFromGsonParsers.kafkaSinkParser(),
                     configurationStateListener,
+                    mdc,
                     Retry.any<Any>()
                             .retryMax(MAX_RETRIES)
                             .fixedBackoff(cbsConfig.requestInterval)
-                            .jitter(Jitter.random()),
-                    mdc
+                            .jitter(Jitter.random())
             )
 
     private val retry = retrySpec.doOnRetry {
@@ -92,25 +92,22 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
                     cbsConfiguration.firstRequestDelay,
                     cbsConfiguration.requestInterval)
             .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
-            .map(::createRoutingDescription)
+            .map(::parseConfiguration)
+            .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
             .onErrorLog(logger, mdc) { "Error while creating configuration" }
             .retryWhen(retry)
-            .map { PartialConfiguration(routing = it) }
 
-    private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
-        val routes = DataStreams.namedSinks(configuration)
-                .filter(streamOfType(KAFKA))
-                .map(streamParser::unsafeParse)
-                .map { Route(it.name(), it) }
-                .asIterable()
-                .toList()
-        Some(routes)
-    } catch (e: NullPointerException) {
-        logger.withWarn(mdc) {
-            log("Invalid streams configuration", e)
-        }
-        None
-    }
+    private fun parseConfiguration(json: JsonObject) =
+            configParser
+                    .parse(json.reader())
+                    .apply { streamPublishers = extractStreamDefinitions(json).toOption() }
+
+    private fun extractStreamDefinitions(configuration: JsonObject): List<KafkaSink> =
+            DataStreams.namedSinks(configuration)
+                    .filter(streamOfType(KAFKA))
+                    .map(streamParser::unsafeParse)
+                    .asIterable()
+                    .toList()
 
     companion object {
         private const val MAX_RETRIES = 5L
index 8e6bafc..e782a1e 100644 (file)
@@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.config.impl
 
 
 import arrow.core.Option
-import arrow.core.Some
 import arrow.core.getOrElse
 import arrow.core.toOption
-import kotlin.reflect.KProperty0
-import kotlin.reflect.KProperty1
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -47,15 +44,11 @@ internal class ConfigurationMerger {
                     trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile),
                     trustStorePassword = base.trustStorePassword.updateToGivenOrNone(update.trustStorePassword),
 
-                    routing = base.routing.updateToGivenOrNone(update.routing),
+                    streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers),
 
                     logLevel = base.logLevel.updateToGivenOrNone(update.logLevel)
             )
 
     private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
             update.getOrElse(this::orNull).toOption()
-
 }
-
-
-
index cfcc7d7..dddf0be 100644 (file)
@@ -23,19 +23,25 @@ import arrow.core.None
 import arrow.core.Option
 import arrow.core.Some
 import arrow.core.getOrElse
+import arrow.core.toOption
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
 import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityKeysPaths
 import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
 import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
 import java.io.File
+import java.nio.file.Path
+import java.time.Duration
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -88,16 +94,16 @@ internal class ConfigurationValidator {
             partial.mapBinding {
                 ServerConfiguration(
                         it.listenPort.bind(),
-                        it.idleTimeoutSec.bind(),
-                        it.maxPayloadSizeBytes.bind()
+                        it.maxPayloadSizeBytes.bind(),
+                        Duration.ofSeconds(it.idleTimeoutSec.bind())
                 )
             }
 
     internal fun validatedCbsConfiguration(partial: PartialConfiguration) =
             partial.mapBinding {
                 CbsConfiguration(
-                        it.firstRequestDelaySec.bind(),
-                        it.requestIntervalSec.bind()
+                        Duration.ofSeconds(it.firstRequestDelaySec.bind()),
+                        Duration.ofSeconds(it.requestIntervalSec.bind())
                 )
             }
 
@@ -113,19 +119,31 @@ internal class ConfigurationValidator {
     private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> =
             partial.mapBinding {
                 SecurityConfiguration(
-                        Option.fromNullable(SecurityKeysPaths(
+                        createSecurityKeys(
                                 File(it.keyStoreFile.bind()).toPath(),
                                 it.keyStorePassword.bind(),
                                 File(it.trustStoreFile.bind()).toPath(),
                                 it.trustStorePassword.bind()
-                        ).asImmutableSecurityKeys())
+                        ).toOption()
                 )
             }
 
+    private fun createSecurityKeys(keyStorePath: Path,
+                                   keyStorePassword: String,
+                                   trustStorePath: Path,
+                                   trustStorePassword: String) =
+            ImmutableSecurityKeys.builder()
+                    .keyStore(ImmutableSecurityKeysStore.of(keyStorePath))
+                    .keyStorePassword(Passwords.fromString(keyStorePassword))
+                    .trustStore(ImmutableSecurityKeysStore.of(trustStorePath))
+                    .trustStorePassword(Passwords.fromString(trustStorePassword))
+                    .build()
+
+
     private fun validatedCollectorConfig(partial: PartialConfiguration) =
-            partial.mapBinding {
+            partial.mapBinding { config ->
                 CollectorConfiguration(
-                        it.routing.bind()
+                        config.streamPublishers.bind().map { Route(it.name(), it) }
                 )
             }
 
@@ -21,28 +21,18 @@ package org.onap.dcae.collectors.veshv.config.impl
 
 import arrow.core.Option
 import com.google.gson.GsonBuilder
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
 import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-
 import java.io.Reader
-import java.time.Duration
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
  * @since February 2019
  */
-internal class FileConfigurationReader {
+internal class JsonConfigurationParser {
     private val gson = GsonBuilder()
             .registerTypeAdapter(Option::class.java, OptionAdapter())
-            .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter())
             .create()
 
-    fun loadConfig(input: Reader): PartialConfiguration =
+    fun parse(input: Reader): PartialConfiguration =
             gson.fromJson(input, PartialConfiguration::class.java)
-                    .also { logger.info { "Successfully read file and parsed json to configuration: $it" } }
-
-    companion object {
-        private val logger = Logger(FileConfigurationReader::class)
-    }
 }
index 0be2572..30f6c3e 100644 (file)
@@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.config.impl
 import arrow.core.None
 import arrow.core.Option
 import com.google.gson.annotations.SerializedName
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import java.time.Duration
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -34,14 +33,14 @@ internal data class PartialConfiguration(
         @SerializedName("server.listenPort")
         val listenPort: Option<Int> = None,
         @SerializedName("server.idleTimeoutSec")
-        val idleTimeoutSec: Option<Duration> = None,
+        val idleTimeoutSec: Option<Long> = None,
         @SerializedName("server.maxPayloadSizeBytes")
         val maxPayloadSizeBytes: Option<Int> = None,
 
         @SerializedName("cbs.firstRequestDelaySec")
-        val firstRequestDelaySec: Option<Duration> = None,
+        val firstRequestDelaySec: Option<Long> = None,
         @SerializedName("cbs.requestIntervalSec")
-        val requestIntervalSec: Option<Duration> = None,
+        val requestIntervalSec: Option<Long> = None,
 
         @SerializedName("security.sslDisable")
         val sslDisable: Option<Boolean> = None,
@@ -54,9 +53,9 @@ internal data class PartialConfiguration(
         @SerializedName("security.keys.trustStorePassword")
         val trustStorePassword: Option<String> = None,
 
-        @SerializedName("collector.routing")
-        val routing: Option<Routing> = None,
-
         @SerializedName("logLevel")
-        val logLevel: Option<LogLevel> = None
+        val logLevel: Option<LogLevel> = None,
+
+        @Transient
+        var streamPublishers: Option<List<KafkaSink>> = None
 )
index d5fe588..94eb519 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.config.impl
 
+import arrow.core.Some
 import com.google.gson.JsonParser
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.eq
@@ -51,9 +52,9 @@ internal object CbsConfigurationProviderTest : Spek({
 
     describe("Configuration provider") {
 
-        val cbsClient: CbsClient = mock()
-        val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
-        val configStateListener: ConfigurationStateListener = mock()
+        val cbsClient = mock<CbsClient>()
+        val cbsClientMock = Mono.just(cbsClient)
+        val configStateListener = mock<ConfigurationStateListener>()
 
         given("configuration is never in cbs") {
             val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
@@ -78,29 +79,32 @@ internal object CbsConfigurationProviderTest : Spek({
 
                     StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
-                                val routes = it.routing.orNull()!!
-                                val route1 = routes.elementAt(0)
-                                val route2 = routes.elementAt(1)
-                                val receivedSink1 = route1.sink
-                                val receivedSink2 = route2.sink
-
-                                assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
-                                assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
-                                assertThat(receivedSink1.bootstrapServers())
+
+                                assertThat(it.listenPort).isEqualTo(Some(6061))
+                                assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
+                                assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
+
+
+                                val sinks = it.streamPublishers.orNull()!!
+                                val sink1 = sinks[0]
+                                val sink2 = sinks[1]
+
+                                assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL)
+                                assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1)
+                                assertThat(sink1.bootstrapServers())
                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
-                                assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+                                assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
 
-                                assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
-                                assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
-                                assertThat(receivedSink2.bootstrapServers())
+                                assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL)
+                                assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2)
+                                assertThat(sink2.bootstrapServers())
                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
-                                assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
-
+                                assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
                             }.verifyComplete()
                 }
             }
-
         }
+
         given("invalid configuration from cbs") {
             val iterationCount = 3L
             val configProvider = constructConfigurationProvider(
@@ -112,7 +116,8 @@ internal object CbsConfigurationProviderTest : Spek({
                         .thenReturn(Flux.just(invalidConfiguration))
 
                 it("should interrupt the flux") {
-                    StepVerifier.create(configProvider())
+                    StepVerifier
+                            .create(configProvider())
                             .verifyError()
                 }
 
@@ -126,8 +131,8 @@ internal object CbsConfigurationProviderTest : Spek({
 })
 
 
-val PERF3GPP_REGIONAL = "perf3gpp_regional"
-val PERF3GPP_CENTRAL = "perf3gpp_central"
+private const val PERF3GPP_REGIONAL = "perf3gpp_regional"
+private const val PERF3GPP_CENTRAL = "perf3gpp_central"
 
 private val aafCredentials1 = ImmutableAafCredentials.builder()
         .username("client")
@@ -141,6 +146,9 @@ private val aafCredentials2 = ImmutableAafCredentials.builder()
 
 private val validConfiguration = JsonParser().parse("""
 {
+    "server.listenPort": 6061,
+    "server.idleTimeoutSec": 60,
+    "server.maxPayloadSizeBytes": 1048576,
     "streams_publishes": {
         "$PERF3GPP_REGIONAL": {
             "type": "kafka",
@@ -173,12 +181,12 @@ private val invalidConfiguration = JsonParser().parse("""
         "$PERF3GPP_REGIONAL": {
             "type": "kafka",
             "aaf_credentials": {
-                "username": "client",
+                "user": "client",
                 "password": "very secure password"
             },
             "kafka_info": {
-                "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
-                "popic_name": "REG_HVVES_PERF3GPP"
+                "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+                "name": "REG_HVVES_PERF3GPP"
             }
         }
     }
@@ -187,20 +195,24 @@ private val invalidConfiguration = JsonParser().parse("""
 private val firstRequestDelay = Duration.ofMillis(1)
 private val requestInterval = Duration.ofMillis(1)
 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
+private val configParser = JsonConfigurationParser()
 
 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
                                            configurationStateListener: ConfigurationStateListener,
                                            iterationCount: Long = 1
 ): CbsConfigurationProvider {
 
-    val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+    val retry = Retry
+            .onlyIf<Any> { it.iteration() <= iterationCount }
+            .fixedBackoff(Duration.ofNanos(1))
 
     return CbsConfigurationProvider(
             cbsClientMono,
             CbsConfiguration(firstRequestDelay, requestInterval),
+            configParser,
             streamParser,
             configurationStateListener,
-            retry,
-            { mapOf("k" to "v") }
+            { mapOf("k" to "v") },
+            retry
     )
 }
index bc61b57..4cd2ba9 100644 (file)
@@ -55,9 +55,9 @@ internal object ConfigurationMergerTest : Spek({
         }
 
         it("merges single parameter into full config") {
-            val actual = FileConfigurationReader().loadConfig(
+            val actual = JsonConfigurationParser().parse(
                     InputStreamReader(
-                            FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+                            JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
             val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
 
             val result = ConfigurationMerger().merge(actual, diff)
@@ -66,31 +66,31 @@ internal object ConfigurationMergerTest : Spek({
         }
 
         it("merges single embedded parameter into full config") {
-            val actual = FileConfigurationReader().loadConfig(
+            val actual = JsonConfigurationParser().parse(
                     InputStreamReader(
-                            FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+                            JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
             val diff = PartialConfiguration(listenPort = someListenPort)
 
             val result = ConfigurationMerger().merge(actual, diff)
 
             assertThat(result.listenPort).isEqualTo(someListenPort)
             assertThat(result.idleTimeoutSec.isEmpty()).isFalse()
-            assertThat(result.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+            assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
             assertThat(result.maxPayloadSizeBytes.isEmpty()).isFalse()
             assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
         }
 
         it("merges full config into single parameter") {
             val actual = PartialConfiguration(logLevel = Some(LogLevel.INFO))
-            val diff = FileConfigurationReader().loadConfig(
+            val diff = JsonConfigurationParser().parse(
                     InputStreamReader(
-                            FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+                            JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
 
             val result = ConfigurationMerger().merge(actual, diff)
 
             assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
             assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
-            assertThat(result.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+            assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
 
             assertThat(result.keyStoreFile.isEmpty()).isFalse()
             assertThat(result.firstRequestDelaySec.isEmpty()).isFalse()
index e43acfa..5fa1fd6 100644 (file)
@@ -23,14 +23,17 @@ import arrow.core.None
 import arrow.core.Option
 import arrow.core.Some
 import arrow.core.getOrElse
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.fail
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
 import java.io.File
 import java.time.Duration
@@ -81,7 +84,7 @@ internal object ConfigurationValidatorTest : Spek({
                     keyStorePassword = Some(KEYSTORE_PASSWORD),
                     trustStoreFile = Some(TRUSTSTORE),
                     trustStorePassword = Some(TRUSTSTORE_PASSWORD),
-                    routing = Some(emptyRouting),
+                    streamPublishers = Some(sampleStreamsDefinition),
                     logLevel = Some(LogLevel.TRACE)
             )
 
@@ -92,9 +95,12 @@ internal object ConfigurationValidatorTest : Spek({
                             fail("Configuration should have been created successfully")
                         },
                         {
-                            assertThat(it.server.listenPort).isEqualTo(defaultListenPort)
-                            assertThat(it.server.idleTimeout).isEqualTo(defaultIdleTimeoutSec)
-                            assertThat(it.server.maxPayloadSizeBytes).isEqualTo(defaultMaxPayloadSizeBytes)
+                            assertThat(it.server.listenPort)
+                                    .isEqualTo(defaultListenPort)
+                            assertThat(it.server.maxPayloadSizeBytes)
+                                    .isEqualTo(defaultMaxPayloadSizeBytes)
+                            assertThat(it.server.idleTimeout)
+                                    .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
 
                             val securityKeys = it.security.keys
                                     .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
@@ -103,10 +109,14 @@ internal object ConfigurationValidatorTest : Spek({
                             securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
                             securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
 
-                            assertThat(it.cbs.firstRequestDelay).isEqualTo(defaultFirstReqDelaySec)
-                            assertThat(it.cbs.requestInterval).isEqualTo(defaultRequestIntervalSec)
+                            assertThat(it.cbs.firstRequestDelay)
+                                    .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
+                            assertThat(it.cbs.requestInterval)
+                                    .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
+
+                            assertThat(it.collector.routing)
+                                    .isEqualTo(sampleRouting)
 
-                            assertThat(it.collector.routing).isEqualTo(emptyRouting)
                             assertThat(it.logLevel).isEqualTo(LogLevel.TRACE)
                         }
                 )
@@ -130,16 +140,16 @@ internal object ConfigurationValidatorTest : Spek({
                         },
                         {
                             assertThat(it.server.idleTimeout)
-                                    .isEqualTo(defaultIdleTimeoutSec)
+                                    .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
 
                             assertThat(it.security.keys)
                                     .isEqualTo(None)
 
                             assertThat(it.cbs.firstRequestDelay)
-                                    .isEqualTo(defaultFirstReqDelaySec)
+                                    .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
 
                             assertThat(it.collector.routing)
-                                    .isEqualTo(emptyRouting)
+                                    .isEqualTo(sampleRouting)
                         }
                 )
             }
@@ -172,42 +182,47 @@ internal object ConfigurationValidatorTest : Spek({
 })
 
 private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPort),
-                                 idleTimeoutSec: Option<Duration> = Some(defaultIdleTimeoutSec),
+                                 idleTimeoutSec: Option<Long> = Some(defaultIdleTimeoutSec),
                                  maxPayloadSizeBytes: Option<Int> = Some(defaultMaxPayloadSizeBytes),
-                                 firstReqDelaySec: Option<Duration> = Some(defaultFirstReqDelaySec),
-                                 requestIntervalSec: Option<Duration> = Some(defaultRequestIntervalSec),
+                                 firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec),
+                                 requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec),
                                  sslDisable: Option<Boolean> = Some(false),
                                  keyStoreFile: Option<String> = Some(KEYSTORE),
                                  keyStorePassword: Option<String> = Some(KEYSTORE_PASSWORD),
                                  trustStoreFile: Option<String> = Some(TRUSTSTORE),
                                  trustStorePassword: Option<String> = Some(TRUSTSTORE_PASSWORD),
-                                 routing: Option<Routing> = Some(emptyRouting),
+                                 streamPublishers: Option<List<KafkaSink>> = Some(sampleStreamsDefinition),
                                  logLevel: Option<LogLevel> = Some(LogLevel.INFO)
-) =
-        PartialConfiguration(
-                listenPort = listenPort,
-                idleTimeoutSec = idleTimeoutSec,
-                maxPayloadSizeBytes = maxPayloadSizeBytes,
-                firstRequestDelaySec = firstReqDelaySec,
-                requestIntervalSec = requestIntervalSec,
-                sslDisable = sslDisable,
-                keyStoreFile = keyStoreFile,
-                keyStorePassword = keyStorePassword,
-                trustStoreFile = trustStoreFile,
-                trustStorePassword = trustStorePassword,
-                routing = routing,
-                logLevel = logLevel
-        )
-
-val defaultListenPort = 1234
-val defaultRequestIntervalSec = Duration.ofSeconds(3)
-val defaultMaxPayloadSizeBytes = 2
-val defaultIdleTimeoutSec = Duration.ofSeconds(10L)
-val defaultFirstReqDelaySec = Duration.ofSeconds(10L)
-
-val KEYSTORE = "test.ks.pkcs12"
-val KEYSTORE_PASSWORD = "changeMe"
-val TRUSTSTORE = "trust.ks.pkcs12"
-val TRUSTSTORE_PASSWORD = "changeMeToo"
-
-val emptyRouting: Routing = emptyList()
+) = PartialConfiguration(
+        listenPort = listenPort,
+        idleTimeoutSec = idleTimeoutSec,
+        maxPayloadSizeBytes = maxPayloadSizeBytes,
+        firstRequestDelaySec = firstReqDelaySec,
+        requestIntervalSec = requestIntervalSec,
+        sslDisable = sslDisable,
+        keyStoreFile = keyStoreFile,
+        keyStorePassword = keyStorePassword,
+        trustStoreFile = trustStoreFile,
+        trustStorePassword = trustStorePassword,
+        streamPublishers = streamPublishers,
+        logLevel = logLevel
+)
+
+const val defaultListenPort = 1234
+const val defaultMaxPayloadSizeBytes = 2
+const val defaultRequestIntervalSec = 3L
+const val defaultIdleTimeoutSec = 10L
+const val defaultFirstReqDelaySec = 10L
+
+const val KEYSTORE = "test.ks.pkcs12"
+const val KEYSTORE_PASSWORD = "changeMe"
+const val TRUSTSTORE = "trust.ks.pkcs12"
+const val TRUSTSTORE_PASSWORD = "changeMeToo"
+
+const val sampleSinkName = "perf3gpp"
+
+private val sampleSink = mock<KafkaSink>().also {
+    whenever(it.name()).thenReturn(sampleSinkName)
+}
+val sampleStreamsDefinition = listOf(sampleSink)
+val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file
@@ -35,15 +35,15 @@ import kotlin.test.fail
  * @author Pawel Biniek <pawel.biniek@nokia.com>
  * @since February 2019
  */
-internal object FileConfigurationReaderTest : Spek({
-    describe("A configuration loader utility") {
-        val cut = FileConfigurationReader()
+internal object JsonConfigurationParserTest : Spek({
+    describe("A configuration parser utility") {
+        val cut = JsonConfigurationParser()
 
-        describe("partial configuration loading") {
+        describe("partial configuration parsing") {
             it("parses enumerations") {
                 val input = """{"logLevel":"ERROR"}"""
 
-                val config = cut.loadConfig(StringReader(input))
+                val config = cut.parse(StringReader(input))
                 assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
             }
 
@@ -53,16 +53,16 @@ internal object FileConfigurationReaderTest : Spek({
                     "cbs.firstRequestDelaySec": 10
                 }
                 """.trimIndent()
-                val config = cut.loadConfig(StringReader(input))
+                val config = cut.parse(StringReader(input))
                 assertThat(config.listenPort).isEqualTo(Some(12003))
-                assertThat(config.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(10)))
+                assertThat(config.firstRequestDelaySec).isEqualTo(Some(10L))
             }
 
             it("parses disabled security configuration") {
                 val input = """{
                     "security.sslDisable": true
                 }""".trimIndent()
-                val config = cut.loadConfig(StringReader(input))
+                val config = cut.parse(StringReader(input))
 
                 assertThat(config.sslDisable.getOrElse { fail("Should be Some") }).isTrue()
             }
@@ -71,26 +71,26 @@ internal object FileConfigurationReaderTest : Spek({
                 val input = """{
                     "logLevel": something
                 }""".trimMargin()
-                val config = cut.loadConfig(input.reader())
+                val config = cut.parse(input.reader())
 
                 assertThat(config.logLevel.isEmpty())
             }
         }
 
-        describe("complete file loading") {
-            it("loads actual file") {
-                val config = cut.loadConfig(
+        describe("complete json parsing") {
+            it("parses actual json") {
+                val config = cut.parse(
                         javaClass.resourceAsStream("/sampleConfig.json"))
 
                 assertThat(config).isNotNull
                 assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
 
                 assertThat(config.listenPort).isEqualTo(Some(6000))
-                assertThat(config.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+                assertThat(config.idleTimeoutSec).isEqualTo(Some(1200L))
                 assertThat(config.maxPayloadSizeBytes).isEqualTo(Some(1048576))
 
-                assertThat(config.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(7)))
-                assertThat(config.requestIntervalSec).isEqualTo(Some(Duration.ofSeconds(900)))
+                assertThat(config.firstRequestDelaySec).isEqualTo(Some(7L))
+                assertThat(config.requestIntervalSec).isEqualTo(Some(900L))
 
                 assertThat(config.sslDisable).isEqualTo(Some(false))
                 assertThat(config.keyStoreFile).isEqualTo(Some("test.ks.pkcs12"))
@@ -101,4 +101,3 @@ internal object FileConfigurationReaderTest : Spek({
         }
     }
 })
-
diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityKeysPaths.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityKeysPaths.kt
deleted file mode 100644 (file)
index 21929b0..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.ssl.boundary
-
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
-import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
-import java.nio.file.Path
-
-data class SecurityKeysPaths(
-        val keyStore: Path,
-        val keyStorePassword: String,
-        val trustStore: Path,
-        val trustStorePassword: String
-) {
-    fun asImmutableSecurityKeys(): SecurityKeys = ImmutableSecurityKeys.builder()
-            .keyStore(ImmutableSecurityKeysStore.of(keyStore))
-            .keyStorePassword(Passwords.fromString(keyStorePassword))
-            .trustStore(ImmutableSecurityKeysStore.of(trustStore))
-            .trustStorePassword(Passwords.fromString(trustStorePassword))
-            .build()
-
-    override fun toString(): String {
-        return "SecurityKeysPaths(keyStore='$keyStore', " +
-                "keyStorePassword=<password>, " +
-                "trustStore='$trustStore', " +
-                "trustStorePassword=<password>)"
-    }
-
-}
index 5053cf0..9dc8c9a 100644 (file)
             <artifactId>guava</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <optional>true</optional>
+        </dependency>
         <dependency>
             <groupId>javax.json</groupId>
             <artifactId>javax.json-api</artifactId>
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
+package org.onap.dcae.collectors.veshv.utils
 
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonDeserializer
 import com.google.gson.JsonElement
-import java.lang.reflect.Type
-import java.time.Duration
 
-/**
- * @author Pawel Biniek <pawel.biniek@nokia.com>
- * @since March 2019
- */
-internal class DurationOfSecondsAdapter : JsonDeserializer<Duration> {
-    override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) =
-        Duration.ofSeconds(json.asLong)
-
-}
+fun JsonElement.reader() = toString().reader()