Use CBS by means of SDK in place of Consul 93/79393/12
authorFilip Krzywka <filip.krzywka@nokia.com>
Thu, 28 Feb 2019 16:33:02 +0000 (17:33 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Mon, 11 Mar 2019 13:51:58 +0000 (14:51 +0100)
- changed IO creation in main to fix error with too early calling
changeState method on collector HealthState
- increased coverage a little with few tests
- corrected coverage-report pom file to include new modules
- temporarily changed to 1.1.4-SNAPSHOT version of sdk due to need
of new version of CBSLookup

Change-Id: Ic73b46cf881ab4fabf52bef0327b09082aa90dc6
Issue-ID: DCAEGEN2-1302
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
24 files changed:
build/hv-collector-coverage/pom.xml
development/consul.d/cbs.json [new file with mode: 0644]
development/docker-compose.yml
pom.xml
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
sources/hv-collector-core/pom.xml
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt [new file with mode: 0644]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt [deleted file]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt [moved from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt with 62% similarity]
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt [new file with mode: 0644]
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt [new file with mode: 0644]
sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt [new file with mode: 0644]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt

index 08fc5a2..31ff78f 100644 (file)
@@ -3,7 +3,7 @@
   ~ ============LICENSE_START=======================================================
   ~ dcaegen2-collectors-veshv
   ~ ================================================================================
-  ~ Copyright (C) 2018 NOKIA
+  ~ Copyright (C) 2018-2019 NOKIA
   ~ ================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
diff --git a/development/consul.d/cbs.json b/development/consul.d/cbs.json
new file mode 100644 (file)
index 0000000..0761c7e
--- /dev/null
@@ -0,0 +1,10 @@
+{
+  "service": {
+    "name": "cbs",
+    "tags": [
+      "cbs"
+    ],
+    "port": 10000,
+    "address": "config-binding-service"
+  }
+}
index c93100e..708c8f3 100644 (file)
@@ -8,13 +8,13 @@ services:
   message-router-zookeeper:
     image: wurstmeister/zookeeper
     ports:
-    - "2181:2181"
+      - "2181:2181"
 
   message-router-kafka:
-#    image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
+    #    image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
     image: wurstmeister/kafka
     ports:
-    - "9092:9092"
+      - "9092:9092"
     environment:
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
       KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181"
@@ -23,9 +23,9 @@ services:
       KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092"
       KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT"
     volumes:
-    - /var/run/docker.sock:/var/run/docker.sock
+      - /var/run/docker.sock:/var/run/docker.sock
     depends_on:
-    - message-router-zookeeper
+      - message-router-zookeeper
 
 
   #
@@ -35,23 +35,34 @@ services:
   consul-server:
     image: docker.io/consul:1.0.6
     ports:
-    - "8500:8500"
-    command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"]
+      - "8500:8500"
+    command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui", "-config-dir=/consul/consul.d"]
+    volumes:
+      - ./consul.d/:/consul/consul.d
 
   consul-config:
     image: consul
-    depends_on:
-    - consul-server
     restart: on-failure
-    command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
+    command: ["kv", "put", "-http-addr=http://consul-server:8500", "dcae-hv-ves-collector", '{
                                               "collector.routing": [
                                                 {
                                                   "fromDomain": "perf3gpp",
                                                   "toTopic": "HV_VES_PERF3GPP"
                                                 }
                                               ]
-                                            }']
+                                            }'
+    ]
+    depends_on:
+      - consul-server
 
+  config-binding-service:
+    image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.platform.configbinding.app-app:2.2.4
+    ports:
+      - "10000:10000"
+    environment:
+      CONSUL_HOST: "consul-server"
+    depends_on:
+      - consul-config
 
   #
   # DCAE HV VES Collector
@@ -60,30 +71,32 @@ services:
   ves-hv-collector:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
     ports:
-    - "6060:6060"
-    - "6061:6061/tcp"
+      - "6060:6060"
+      - "6061:6061/tcp"
     command: ["--listen-port", "6061",
               "--health-check-api-port", "6060",
-              "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
               "--kafka-bootstrap-servers", "message-router-kafka:9092",
               "--key-store-password", "onaponap",
               "--trust-store-password", "onaponap",
-              "--first-request-delay", "2",
+              "--first-request-delay", "5",
               "--log-level", "DEBUG"]
     environment:
       JAVA_OPTS:  "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
+      CONSUL_HOST: "consul-server"
+      CONFIG_BINDING_SERVICE: "cbs"
+      HOSTNAME: "dcae-hv-ves-collector"
     healthcheck:
       test: ./healthcheck.sh || exit 1
       interval: 10s
       timeout: 3s
       retries: 3
-      start_period: 20s
+      start_period: 15s
     depends_on:
-    - message-router-kafka
-    - consul-config
+      - message-router-kafka
+      - config-binding-service
     volumes:
-    - ./ssl/:/etc/ves-hv/
-    - ./logs:/var/log/ONAP/dcae-hv-ves-collector
+      - ./ssl/:/etc/ves-hv/
+      - ./logs:/var/log/ONAP/dcae-hv-ves-collector
 
 
   #
@@ -93,8 +106,8 @@ services:
   xnf-simulator:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
     ports:
-    - "6062:6062/tcp"
-    - "6063:6063"
+      - "6062:6062/tcp"
+      - "6063:6063"
     command: ["--listen-port", "6062",
               "--health-check-api-port", "6063",
               "--ves-host", "ves-hv-collector",
@@ -109,19 +122,19 @@ services:
       retries: 3
       start_period: 10s
     depends_on:
-    - ves-hv-collector
+      - ves-hv-collector
     volumes:
       - ./ssl/:/etc/ves-hv/
 
   dcae-app-simulator:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
     ports:
-    - "6064:6064/tcp"
+      - "6064:6064/tcp"
     command: ["--listen-port", "6064",
               "--kafka-bootstrap-servers", "message-router-kafka:9092",
               "--kafka-topics", "HV_VES_PERF3GPP"]
     depends_on:
-    - message-router-kafka
+      - message-router-kafka
 
   #
   # Monitoring
@@ -136,16 +149,16 @@ services:
   grafana:
     image: grafana/grafana
     ports:
-    - "3000:3000"
+      - "3000:3000"
     environment:
       GF_AUTH_DISABLE_LOGIN_FORM: "true"
       GF_AUTH_DISABLE_SIGNOUT_MENU: "true"
       GF_AUTH_ANONYMOUS_ENABLED: "true"
       GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin"
     volumes:
-    - ./grafana/datasources:/etc/grafana/provisioning/datasources
-    - ./grafana/dashboards-providers:/etc/grafana/provisioning/dashboards
-    # defined in ./grafana/dashboards-providers/dasboard-providers.yaml
-    - ./grafana/dashboards:/var/lib/grafana/dashboards/hv-ves
+      - ./grafana/datasources:/etc/grafana/provisioning/datasources
+      - ./grafana/dashboards-providers:/etc/grafana/provisioning/dashboards
+      # defined in ./grafana/dashboards-providers/dasboard-providers.yaml
+      - ./grafana/dashboards:/var/lib/grafana/dashboards/hv-ves
 
 
diff --git a/pom.xml b/pom.xml
index 7d11f03..9322608 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
         <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
         <jacoco.version>0.8.2</jacoco.version>
         <detekt.version>1.0.0-RC11</detekt.version>
-        <sdk.version>1.1.3</sdk.version>
+        <sdk.version>1.1.4-SNAPSHOT</sdk.version>
 
         <!-- Protocol buffers -->
         <protobuf.version>3.6.1</protobuf.version>
                 <artifactId>ssl</artifactId>
                 <version>${sdk.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+                <artifactId>cbs-client</artifactId>
+                <version>${sdk.version}</version>
+            </dependency>
 
             <!-- Test dependencies -->
 
index 0c3f60b..3184921 100644 (file)
@@ -38,26 +38,18 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false)
             .build(),
         required = true
     ),
-    CONSUL_CONFIG_URL(
-        Option.builder("c")
-            .longOpt("config-url")
-            .hasArg()
-            .desc("URL of ves configuration on consul")
-            .build(),
-        required = true
-    ),
-    CONSUL_FIRST_REQUEST_DELAY(
+    CONFIGURATION_FIRST_REQUEST_DELAY(
         Option.builder("d")
             .longOpt("first-request-delay")
             .hasArg()
-            .desc("Delay of first request to consul in seconds")
+            .desc("Delay of first request for configuration in seconds")
             .build()
     ),
-    CONSUL_REQUEST_INTERVAL(
+    CONFIGURATION_REQUEST_INTERVAL(
         Option.builder("I")
             .longOpt("request-interval")
             .hasArg()
-            .desc("Interval of consul configuration requests in seconds")
+            .desc("Interval of configuration requests in seconds")
             .build()
     ),
     VES_HV_PORT(
index 29e1ea9..c21f2ed 100644 (file)
@@ -3,7 +3,7 @@
   ~ ============LICENSE_START=======================================================
   ~ dcaegen2-collectors-veshv
   ~ ================================================================================
-  ~ Copyright (C) 2018 NOKIA
+  ~ Copyright (C) 2018-2019 NOKIA
   ~ ================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -19,8 +19,8 @@
   ~ ============LICENSE_END=========================================================
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <licenses>
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>cbs-client</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
         </dependency>
-        <dependency>
-            <groupId>javax.json</groupId>
-            <artifactId>javax.json-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish</groupId>
-            <artifactId>javax.json</artifactId>
-            <scope>runtime</scope>
-        </dependency>
     </dependencies>
 
 </project>
index 535d1ba..633095d 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.util.concurrent.atomic.AtomicReference
@@ -53,18 +54,19 @@ class CollectorFactory(val configuration: ConfigurationProvider,
         val config: AtomicReference<CollectorConfiguration> = AtomicReference()
         configuration()
                 .doOnNext {
-                    logger.info { "Using updated configuration for new connections" }
+                    logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
                     healthState.changeState(HealthDescription.HEALTHY)
                 }
                 .doOnError {
-                    logger.error { "Failed to acquire configuration from consul" }
+                    logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
+                    logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
                     healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
                 }
                 .subscribe(config::set)
 
         return object : CollectorProvider {
             override fun invoke(ctx: ClientContext): Option<Collector> =
-                config.getOption().map { createVesHvCollector(it, ctx) }
+                    config.getOption().map { createVesHvCollector(it, ctx) }
 
             override fun close() = sinkProvider.close()
         }
index 75b6f0a..312d6d7 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,7 +24,8 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
-import reactor.netty.http.client.HttpClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,8 +39,8 @@ object AdapterFactory {
             else
                 KafkaSinkProvider(kafkaConfig)
 
-    fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
-            ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
-
-    private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+    fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+            ConfigurationProviderImpl(
+                    CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+                    configurationProviderParams)
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
new file mode 100644 (file)
index 0000000..736f474
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.google.gson.JsonObject
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
+                                         private val firstRequestDelay: Duration,
+                                         private val requestInterval: Duration,
+                                         private val healthState: HealthState,
+                                         retrySpec: Retry<Any>
+
+) : ConfigurationProvider {
+    constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+            cbsClientMono,
+            params.firstRequestDelay,
+            params.requestInterval,
+            HealthState.INSTANCE,
+            Retry.any<Any>()
+                    .retryMax(MAX_RETRIES)
+                    .fixedBackoff(params.requestInterval)
+                    .jitter(Jitter.random())
+    )
+
+    private val retry = retrySpec.doOnRetry {
+        logger.withWarn(ServiceContext::mdc) {
+            log("Exception from configuration provider client, retrying subscription", it.exception())
+        }
+        healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+    }
+
+    override fun invoke(): Flux<CollectorConfiguration> =
+            cbsClientMono
+                    .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
+                    .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
+                    .retryWhen(retry)
+                    .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
+                    .flatMapMany(::handleUpdates)
+
+    private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+            .updates(RequestDiagnosticContext.create(),
+                    firstRequestDelay,
+                    requestInterval)
+            .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
+            .map(::createCollectorConfiguration)
+            .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
+            .retryWhen(retry)
+
+
+    private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+            try {
+                val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
+                CollectorConfiguration(
+                        routing {
+                            for (route in routingArray) {
+                                val routeObj = route.asJsonObject
+                                defineRoute {
+                                    fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+                                    toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+                                    withFixedPartitioning()
+                                }
+                            }
+                        }.build()
+                )
+            } catch (e: NullPointerException) {
+                throw ParsingException("Failed to parse configuration", e)
+            }
+
+    private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+
+
+    companion object {
+        private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
+        private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
+        private const val TOPIC_CONFIGURATION_KEY = "toTopic"
+
+        private const val MAX_RETRIES = 5L
+        private val logger = Logger(ConfigurationProviderImpl::class)
+    }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
deleted file mode 100644 (file)
index d58cc79..0000000
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.io.StringReader
-import java.security.MessageDigest
-import java.time.Duration
-import java.util.*
-import java.util.concurrent.atomic.AtomicReference
-import javax.json.Json
-import javax.json.JsonObject
-
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConsulConfigurationProvider(private val http: HttpAdapter,
-                                           private val url: String,
-                                           private val firstRequestDelay: Duration,
-                                           private val requestInterval: Duration,
-                                           private val healthState: HealthState,
-                                           retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
-    private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
-    private val retry = retrySpec.doOnRetry {
-        logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
-        healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
-    }
-
-    constructor(http: HttpAdapter,
-                params: ConfigurationProviderParams) : this(
-            http,
-            params.configurationUrl,
-            params.firstRequestDelay,
-            params.requestInterval,
-            HealthState.INSTANCE,
-            Retry.any<Any>()
-                    .retryMax(MAX_RETRIES)
-                    .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
-                    .jitter(Jitter.random())
-    )
-
-    override fun invoke(): Flux<CollectorConfiguration> =
-            Flux.interval(firstRequestDelay, requestInterval)
-                    .concatMap { askForConfig() }
-                    .flatMap(::filterDifferentValues)
-                    .map(::parseJsonResponse)
-                    .map(::createCollectorConfiguration)
-                    .retryWhen(retry)
-
-    private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
-        val invocationId = UUID.randomUUID()
-        http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
-    }
-
-    private fun filterDifferentValues(configuration: BodyWithInvocationId) =
-            configuration.body.let { configurationString ->
-                configurationString.sha256().let { newHash ->
-                    if (newHash contentEquals lastConfigurationHash.get()) {
-                        logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
-                            "No change detected in consul configuration"
-                        }
-                        Mono.empty()
-                    } else {
-                        logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
-                            "Obtained new configuration from consul:\n$configurationString"
-                        }
-                        lastConfigurationHash.set(newHash)
-                        Mono.just(configurationString)
-                    }
-                }
-            }
-
-    private fun parseJsonResponse(responseString: String): JsonObject =
-            Json.createReader(StringReader(responseString)).readObject()
-
-    private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
-            try {
-                val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
-                CollectorConfiguration(
-                        routing {
-                            for (route in routingArray) {
-                                val routeObj = route.asJsonObject()
-                                defineRoute {
-                                    fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
-                                    toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
-                                    withFixedPartitioning()
-                                }
-                            }
-                        }.build()
-                )
-            } catch (e: NullPointerException) {
-                throw ParsingException("Failed to parse consul configuration", e)
-            }
-
-
-    companion object {
-        private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
-        private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
-        private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
-        private const val MAX_RETRIES = 5L
-        private const val BACKOFF_INTERVAL_FACTOR = 30L
-        private val logger = Logger(ConsulConfigurationProvider::class)
-        private fun String.sha256() =
-                MessageDigest
-                        .getInstance("SHA-256")
-                        .digest(toByteArray())
-
-    }
-
-    private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
-}
index 91f502e..a1e5b8f 100644 (file)
@@ -78,4 +78,4 @@ internal fun populateClientContextFromInbound(clientContext: ClientContext, nett
         withConnectionFrom(nettyInbound) { connection ->
             clientContext.clientAddress = Try { connection.address().address }.toOption()
             clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
-        }
\ No newline at end of file
+        }
index 9de3449..ac7a9db 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,6 +25,5 @@ import java.time.Duration
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since July 2018
  */
-data class ConfigurationProviderParams(val configurationUrl: String,
-                                       val firstRequestDelay: Duration,
+data class ConfigurationProviderParams(val firstRequestDelay: Duration,
                                        val requestInterval: Duration)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
+import com.google.gson.JsonParser
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.eq
 import com.nhaarman.mockitokotlin2.mock
@@ -29,11 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.mockito.Mockito
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Flux
 
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
@@ -44,24 +46,36 @@ import java.time.Duration
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
-internal object ConsulConfigurationProviderTest : Spek({
+internal object ConfigurationProviderImplTest : Spek({
 
-    describe("Consul configuration provider") {
+    describe("Configuration provider") {
 
-        val httpAdapterMock: HttpAdapter = mock()
+        val cbsClient: CbsClient = mock()
+        val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
         val healthStateProvider = HealthState.INSTANCE
 
-        given("valid resource url") {
-            val validUrl = "http://valid-url/"
-            val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
+        given("configuration is never in cbs") {
+            val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
 
-            on("call to consul") {
-                whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
-                        .thenReturn(Mono.just(constructConsulResponse()))
+            on("waiting for configuration") {
+                val waitTime = Duration.ofMillis(100)
 
+                it("should not get it") {
+                    StepVerifier.create(configProvider().take(1))
+                            .expectNoEvent(waitTime)
+                }
+            }
+
+        }
+        given("valid configuration from cbs") {
+            val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+
+            on("new configuration") {
+                whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+                        .thenReturn(Flux.just(validConfiguration))
                 it("should use received configuration") {
 
-                    StepVerifier.create(consulConfigProvider().take(1))
+                    StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
 
                                 val route1 = it.routing.routes[0]
@@ -85,22 +99,19 @@ internal object ConsulConfigurationProviderTest : Spek({
             }
 
         }
-        given("invalid resource url") {
-            val invalidUrl = "http://invalid-url/"
-
+        given("invalid configuration from cbs") {
             val iterationCount = 3L
-            val consulConfigProvider = constructConsulConfigProvider(
-                    invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+            val configProvider = constructConfigurationProvider(
+                    cbsClientMock, healthStateProvider, iterationCount
             )
 
-            on("call to consul") {
-                whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
-                        .thenReturn(Mono.error(RuntimeException("Test exception")))
+            on("new configuration") {
+                whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+                        .thenReturn(Flux.just(invalidConfiguration))
 
                 it("should interrupt the flux") {
-
-                    StepVerifier.create(consulConfigProvider())
-                            .verifyErrorMessage("Test exception")
+                    StepVerifier.create(configProvider())
+                            .verifyError()
                 }
 
                 it("should update the health state") {
@@ -115,28 +126,9 @@ internal object ConsulConfigurationProviderTest : Spek({
 
 })
 
-private fun constructConsulConfigProvider(url: String,
-                                          httpAdapter: HttpAdapter,
-                                          healthState: HealthState,
-                                          iterationCount: Long = 1
-): ConsulConfigurationProvider {
-
-    val firstRequestDelay = Duration.ofMillis(1)
-    val requestInterval = Duration.ofMillis(1)
-    val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
 
-    return ConsulConfigurationProvider(
-            httpAdapter,
-            url,
-            firstRequestDelay,
-            requestInterval,
-            healthState,
-            retry
-    )
-}
-
-fun constructConsulResponse(): String =
-    """{
+private val validConfiguration = JsonParser().parse("""
+{
     "whatever": "garbage",
     "collector.routing": [
             {
@@ -148,4 +140,34 @@ fun constructConsulResponse(): String =
                 "toTopic": "test-topic-2"
             }
     ]
-    }"""
+}""").asJsonObject
+
+private val invalidConfiguration = JsonParser().parse("""
+{
+    "whatever": "garbage",
+    "collector.routing": [
+            {
+                "fromDomain": "garbage",
+                "meaningful": "garbage"
+            }
+    ]
+}""").asJsonObject
+
+private val firstRequestDelay = Duration.ofMillis(1)
+private val requestInterval = Duration.ofMillis(1)
+
+private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
+                                           healthState: HealthState,
+                                           iterationCount: Long = 1
+): ConfigurationProviderImpl {
+
+    val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+    return ConfigurationProviderImpl(
+            cbsClientMono,
+            firstRequestDelay,
+            requestInterval,
+            healthState,
+            retry
+    )
+}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
new file mode 100644 (file)
index 0000000..63caaf0
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import com.google.protobuf.MessageLite
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+
+class ProtobufSerializerTest : Spek({
+
+    describe("ProtobufSerializerTest") {
+        val serializer = ProtobufSerializer()
+
+        on("serialize") {
+            it("should return byte array from WTP Frame paylaod") {
+                val header = getDefaultInstance()
+                val payload = header.toByteArray()
+                val msg: MessageLite = mock()
+
+                serializer.serialize("", msg)
+
+                verify(msg).toByteArray()
+            }
+        }
+
+        on("configuring") {
+            it("should do nothing") {
+                // increase code coverage
+                serializer.configure(hashMapOf<String, String>(), false)
+            }
+        }
+
+        on("closing") {
+            it("should do nothing") {
+                // increase code coverage
+                serializer.close()
+            }
+        }
+    }
+
+
+})
\ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
new file mode 100644 (file)
index 0000000..3a194b4
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+
+class VesMessageSerializerTest : Spek({
+
+    describe("VesMessageSerializer") {
+        val serializer = VesMessageSerializer()
+
+        on("serialize") {
+            it("should return byte array from WTP Frame paylaod") {
+                val header = getDefaultInstance()
+                val payload = header.toByteArray()
+                val msg = VesMessage(header, WireFrameMessage(payload))
+
+                val serialized = serializer.serialize("", msg)
+
+                assertThat(serialized).isEqualTo(payload)
+            }
+        }
+
+        on("configuring") {
+          it("should do nothing") {
+                // increase code coverage
+              serializer.configure(hashMapOf<String, String>(), false)
+          }
+        }
+
+        on("closing") {
+            it("should do nothing") {
+                // increase code coverage
+                serializer.close()
+            }
+        }
+    }
+
+
+
+})
\ No newline at end of file
index 3248600..fb5bb9a 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import reactor.netty.DisposableServer
 import reactor.netty.http.server.HttpServer
 import reactor.netty.http.server.HttpServerRequest
 import reactor.netty.http.server.HttpServerResponse
index 99ec5e1..bb484cf 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,9 +27,8 @@ import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_CONFIG_URL
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY
 import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE
 import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
 import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
@@ -64,9 +63,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
             KAFKA_SERVERS,
             HEALTH_CHECK_API_PORT,
             LISTEN_PORT,
-            CONSUL_CONFIG_URL,
-            CONSUL_FIRST_REQUEST_DELAY,
-            CONSUL_REQUEST_INTERVAL,
+            CONFIGURATION_FIRST_REQUEST_DELAY,
+            CONFIGURATION_REQUEST_INTERVAL,
             SSL_DISABLE,
             KEY_STORE_FILE,
             KEY_STORE_PASSWORD,
@@ -117,17 +115,15 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
 
     private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
             Option.monad().binding {
-                val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
                 val firstRequestDelay = cmdLine.longValue(
-                        CONSUL_FIRST_REQUEST_DELAY,
-                        DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+                        CONFIGURATION_FIRST_REQUEST_DELAY,
+                        DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY
                 )
                 val requestInterval = cmdLine.longValue(
-                        CONSUL_REQUEST_INTERVAL,
-                        DefaultValues.CONSUL_REQUEST_INTERVAL
+                        CONFIGURATION_REQUEST_INTERVAL,
+                        DefaultValues.CONFIGURATION_REQUEST_INTERVAL
                 )
                 ConfigurationProviderParams(
-                        configUrl,
                         Duration.ofSeconds(firstRequestDelay),
                         Duration.ofSeconds(requestInterval)
                 )
@@ -145,8 +141,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
 
     internal object DefaultValues {
         const val HEALTH_CHECK_API_PORT = 6060
-        const val CONSUL_FIRST_REQUEST_DELAY = 10L
-        const val CONSUL_REQUEST_INTERVAL = 5L
+        const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L
+        const val CONFIGURATION_REQUEST_INTERVAL = 5L
         const val IDLE_TIMEOUT_SEC = 60L
         const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
         val LOG_LEVEL = LogLevel.INFO.name
index f00b9ee..d21b490 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main
 import arrow.effects.IO
 import arrow.effects.fix
 import arrow.effects.instances.io.monad.monad
-import arrow.effects.instances.io.monadError.monadError
 import arrow.typeclasses.binding
 import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
@@ -51,23 +50,25 @@ fun main(args: Array<String>) =
                             logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
                             ExitFailure(1)
                         },
-                        { logger.debug(ServiceContext::mdc) { "Finished" } }
+                        { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } }
                 )
 
 private fun startAndAwaitServers(config: ServerConfiguration) =
         IO.monad().binding {
             Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
             logger.info { "Using configuration: $config" }
+
             val healthCheckServerHandle = HealthCheckServer.start(config).bind()
-            VesServer.start(config).bind().let { handle ->
-                registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind()
-                handle.await().bind()
-            }
-        }.fix()
+            val hvVesHandle = VesServer.start(config).bind()
 
-internal fun closeServers(vararg handles: ServerHandle, healthState: HealthState = HealthState.INSTANCE): IO<Unit> =
-        IO.monadError().binding {
-            healthState.changeState(HealthDescription.SHUTTING_DOWN)
-            Closeable.closeAll(handles.asIterable()).bind()
-            logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
+            registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle))
+            hvVesHandle.await().bind()
         }.fix()
+
+internal fun closeServers(vararg handles: ServerHandle,
+                          healthState: HealthState = HealthState.INSTANCE) = {
+    logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
+    healthState.changeState(HealthDescription.SHUTTING_DOWN)
+    Closeable.closeAll(handles.asIterable()).unsafeRunSync()
+    logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
+}
index 4e2e6d8..62c2430 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -37,7 +37,7 @@ object VesServer : ServerStarter() {
 
     private fun createVesServer(config: ServerConfiguration): Server {
         val collectorProvider = CollectorFactory(
-                AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+                AdapterFactory.configurationProvider(config.configurationProviderParams),
                 AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
                 MicrometerMetrics.INSTANCE,
                 config.maximumPayloadSizeBytes
index da2bfb4..6d21910 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -42,7 +42,6 @@ object ArgVesHvConfigurationTest : Spek({
     lateinit var cut: ArgVesHvConfiguration
     val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
     val healthCheckApiPort = "6070"
-    val configurationUrl = "http://test-address/test"
     val firstRequestDelay = "10"
     val requestInterval = "5"
     val listenPort = "6969"
@@ -63,7 +62,6 @@ object ArgVesHvConfigurationTest : Spek({
                         "--kafka-bootstrap-servers", kafkaBootstrapServers,
                         "--health-check-api-port", healthCheckApiPort,
                         "--listen-port", listenPort,
-                        "--config-url", configurationUrl,
                         "--first-request-delay", firstRequestDelay,
                         "--request-interval", requestInterval,
                         "--key-store", "/tmp/keys.p12",
@@ -95,21 +93,16 @@ object ArgVesHvConfigurationTest : Spek({
                 assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
             }
 
-            it("should set proper first consul request delay") {
+            it("should set proper first request delay") {
                 assertThat(result.configurationProviderParams.firstRequestDelay)
                         .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
             }
 
-            it("should set proper consul request interval") {
+            it("should set proper request interval") {
                 assertThat(result.configurationProviderParams.requestInterval)
                         .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
             }
 
-            it("should set proper config url") {
-                assertThat(result.configurationProviderParams.configurationUrl)
-                        .isEqualTo(configurationUrl)
-            }
-
             it("should set proper security configuration") {
                 assertThat(result.securityConfiguration.keys.isEmpty()).isFalse()
 
@@ -135,7 +128,6 @@ object ArgVesHvConfigurationTest : Spek({
                 it("should throw exception") {
                     assertThat(
                             cut.parseExpectingFailure(
-                                    "--config-url", configurationUrl,
                                     "--ssl-disable",
                                     "--first-request-delay", firstRequestDelay,
                                     "--request-interval", requestInterval
@@ -164,7 +156,6 @@ object ArgVesHvConfigurationTest : Spek({
                             "--kafka-bootstrap-servers", kafkaBootstrapServers,
                             "--health-check-api-port", healthCheckApiPort,
                             "--listen-port", listenPort,
-                            "--config-url", configurationUrl,
                             "--first-request-delay", firstRequestDelay,
                             "--request-interval", requestInterval,
                             "--key-store", "/tmp/keys.p12",
@@ -183,7 +174,6 @@ object ArgVesHvConfigurationTest : Spek({
                             "--kafka-bootstrap-servers", kafkaBootstrapServers,
                             "--health-check-api-port", healthCheckApiPort,
                             "--listen-port", listenPort,
-                            "--config-url", configurationUrl,
                             "--first-request-delay", firstRequestDelay,
                             "--request-interval", requestInterval,
                             "--key-store", "/tmp/keys.p12",
index e032f00..e18b0b1 100644 (file)
@@ -51,7 +51,7 @@ internal object MainTest : Spek({
             val healthState: HealthState = mock()
 
             on("closeServers") {
-                closeServers(handle, healthState = healthState).unsafeRunSync()
+                closeServers(handle, healthState = healthState).invoke()
 
                 it("should close all handles") {
                     assertThat(closed).isTrue()
diff --git a/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt b/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt
new file mode 100644 (file)
index 0000000..ddb3e35
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ssl.boundary
+
+import com.nhaarman.mockitokotlin2.doReturn
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.apache.commons.cli.CommandLine
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
+import org.onap.dcae.collectors.veshv.commandline.hasOption
+
+
+internal object SecurityUtilsTest : Spek({
+
+    describe("creating securty configuration provider") {
+
+        on("command line without ssl disable") {
+            val commandLine: CommandLine = mock()
+            whenever(commandLine.hasOption(CommandLineOption.SSL_DISABLE)).doReturn(false)
+
+            it("should create configuration with some keys") {
+                val configuration = createSecurityConfiguration(commandLine)
+
+                verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE)
+                assertThat(configuration.isSuccess()).isTrue()
+                configuration.map { assertThat(it.keys.isDefined()).isTrue() }
+            }
+        }
+        on("command line with ssl disabled") {
+            val commandLine: CommandLine = mock()
+            whenever(commandLine.hasOption(CommandLineOption.SSL_DISABLE)).doReturn(true)
+
+            it("should create configuration without keys") {
+                val configuration = createSecurityConfiguration(commandLine)
+
+                verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE)
+                assertThat(configuration.isSuccess()).isTrue()
+                configuration.map { assertThat(it.keys.isEmpty()).isTrue() }
+            }
+        }
+    }
+})
index 99ecfd7..7d92dda 100644 (file)
@@ -72,3 +72,19 @@ fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
                 Mono.just<T>(t)
             })
         }
+
+
+fun <T> Mono<T>.onErrorLog(logger: Logger,
+                           mdc: () -> Map<String, String>,
+                           msg: () -> String) =
+        doOnError { logException(logger, mdc, msg, it) }
+
+fun <T> Flux<T>.onErrorLog(logger: Logger,
+                           mdc: () -> Map<String, String>,
+                           msg: () -> String) =
+        doOnError { logException(logger, mdc, msg, it) }
+
+private fun logException(logger: Logger, mdc: () -> Map<String, String>, msg: () -> String, it: Throwable) {
+    logger.error(mdc) { "${msg()}: ${it.message}" }
+    logger.debug(mdc) { "Detailed stack trace: ${it}" }
+}
index 2678a8d..87aea41 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.utils
 
-import arrow.effects.IO
-
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since January 2019
  */
-
-fun registerShutdownHook(job: () -> Unit) {
-    Runtime.getRuntime().addShutdownHook(object : Thread() {
-        override fun run() {
-            job()
-        }
-    })
-}
-
-fun registerShutdownHook(job: IO<Unit>) = IO {
-    registerShutdownHook {
-        job.unsafeRunSync()
-    }
-}
+fun registerShutdownHook(job: () -> Unit) =
+        Runtime.getRuntime()
+                .addShutdownHook(Thread({ job() }, "GracefulShutdownThread"))