Various improvements 99/58599/2
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 14 Jun 2018 07:48:46 +0000 (09:48 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 07:49:02 +0000 (09:49 +0200)
* Kotlin upgrade
* Monad usage on APIs
* Idle timeout
* Simulator enhancements

Closes ONAP-390

Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

43 files changed:
development.sh [new file with mode: 0755]
docker-compose.yml
hv-collector-client-simulator/Dockerfile
hv-collector-client-simulator/pom.xml
hv-collector-client-simulator/sample-request.json [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt
hv-collector-core/pom.xml
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
hv-collector-dcae-app-simulator/Dockerfile
hv-collector-dcae-app-simulator/pom.xml
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt
hv-collector-domain/pom.xml
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/main/resources/logback.xml
hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
hv-collector-utils/pom.xml
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt [new file with mode: 0644]
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
pom.xml

diff --git a/development.sh b/development.sh
new file mode 100755 (executable)
index 0000000..9450007
--- /dev/null
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# Usage: source ./development.sh and use functions defined here
+# https://httpie.org/ is required for API calls
+
+export MAVEN_OPTS="-T1C"
+
+function veshv_full_rebuild() {
+  mvn clean install -Panalysis ${MAVEN_OPTS}
+}
+
+function veshv_rebuild() {
+  mvn clean install ${MAVEN_OPTS}
+}
+
+function veshv_build() {
+  mvn install ${MAVEN_OPTS}
+}
+
+function veshv_fast_build() {
+  mvn install -DskipTests ${MAVEN_OPTS}
+}
+
+function veshv_docker_start() {
+  docker-compose down
+  docker-compose rm -f
+  docker-compose up
+}
+
+function veshv_docker_clean() {
+  docker volume prune
+}
+
+function veshv_build_and_start() {
+  veshv_fast_build && veshv_docker_start
+}
+
+function veshv_fresh_restart() {
+  docker-compose down
+  docker-compose rm -f
+  veshv_docker_clean
+  veshv_fast_build && docker-compose up
+}
+
+function veshv_simul_dcaeapp_count() {
+  http --json GET http://localhost:8100/messages/count
+}
+
+function veshv_simul_dcaeapp_last_key() {
+  http --json GET http://localhost:8100/messages/last/key
+}
+
+function veshv_simul_dcaeapp_last_value() {
+  http --json GET http://localhost:8100/messages/last/value
+}
+
+function veshv_simul_client() {
+  # feed me with json file using "<"
+  http --json POST http://localhost:8000/simulator/sync
+}
+
+function veshv_simul_client_async() {
+  # feed me with json file using "<"
+  http --json POST http://localhost:8000/simulator/async
+}
+
index 8db767c..65951ed 100644 (file)
@@ -1,9 +1,11 @@
 version: "2"
 services:
+
   zookeeper:
     image: wurstmeister/zookeeper
     ports:
       - "2181:2181"
+
   kafka:
     image: wurstmeister/kafka
     ports:
@@ -17,29 +19,37 @@ services:
       - /var/run/docker.sock:/var/run/docker.sock
     depends_on:
       - zookeeper
-  veshvcollector:
-    build:
-      context: hv-collector-main
-      dockerfile: Dockerfile
+
+  ves-hv-collector:
+    image: onap/ves-hv-collector
+#    build:
+#      context: hv-collector-main
+#      dockerfile: Dockerfile
     ports:
       - "6061:6061/tcp"
     depends_on:
       - kafka
     volumes:
       - ./ssl/:/etc/ves-hv/
-  xnfsimulator:
-    build:
-      context: hv-collector-client-simulator
-      dockerfile: Dockerfile
+
+  xnf-simulator:
+    image: onap/ves-hv-collector-client-simulator
+#    build:
+#      context: hv-collector-client-simulator
+#      dockerfile: Dockerfile
+    ports:
+      - "8000:5000/tcp"
     depends_on:
-      - veshvcollector
+      - ves-hv-collector
     volumes:
       - ./ssl/:/etc/ves-hv/
+
   dcae-app-simulator:
-    build:
-      context: hv-collector-dcae-app-simulator
-      dockerfile: Dockerfile
+    image: onap/ves-hv-collector-dcae-simulator
+#    build:
+#      context: hv-collector-dcae-app-simulator
+#      dockerfile: Dockerfile
     ports:
-      - "8080:8080/tcp"
+      - "8100:5000/tcp"
     depends_on:
       - kafka
index 58cfa44..7d12c49 100644 (file)
@@ -5,9 +5,11 @@ LABEL license.name="The Apache Software License, Version 2.0"
 LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
 LABEL maintainer="Nokia Wroclaw ONAP Team"
 
+EXPOSE 5000
+
 WORKDIR /opt/ves-hv-client-simulator
 ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"]
-CMD ["--ves-host", "veshvcollector", "--ves-port", "6061"]
+CMD ["--ves-host", "ves-hv-collector", "--ves-port", "6061"]
 COPY target/libs/external/* ./
 COPY target/libs/internal/* ./
 COPY target/hv-collector-client-simulator-*.jar ./
index c1c1f2e..8cfe0a4 100644 (file)
@@ -19,8 +19,8 @@
   ~ ============LICENSE_END=========================================================
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <licenses>
             <artifactId>hv-collector-utils</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-effects</artifactId>
+        </dependency>
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
diff --git a/hv-collector-client-simulator/sample-request.json b/hv-collector-client-simulator/sample-request.json
new file mode 100644 (file)
index 0000000..ca8bd88
--- /dev/null
@@ -0,0 +1,20 @@
+{ 
+      "commonEventHeader": {
+                "version": "sample-version",
+                "domain": 10,
+                "sequence": 1,
+                "priority": 1,
+                "eventId": "sample-event-id",
+                "eventName": "sample-event-name",
+                "eventType": "sample-event-type",
+                "startEpochMicrosec": 120034455,
+                "lastEpochMicrosec": 120034455,
+                "nfNamingCode": "sample-nf-naming-code",
+                "nfcNamingCode": "sample-nfc-naming-code",
+                "reportingEntityId": "sample-reporting-entity-id",
+                "reportingEntityName": "sample-reporting-entity-name",
+                "sourceId": "sample-source-id",
+                "sourceName": "sample-source-name"
+        },
+        "messagesAmount": 25000
+}
index b8a4b88..f29b693 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.config
 
-import org.apache.commons.cli.DefaultParser
 import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MESSAGES_TO_SEND_AMOUNT
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
 
 
 /**
@@ -40,6 +40,8 @@ internal object DefaultValues {
     const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
     const val CERT_FILE = "/etc/ves-hv/client.crt"
     const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
+    const val VES_HV_PORT = 6061
+    const val VES_HV_HOST = "veshvcollector"
 }
 
 internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfiguration>(DefaultParser()) {
@@ -53,8 +55,8 @@ internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfigu
     )
 
     override fun getConfiguration(cmdLine: CommandLine): ClientConfiguration {
-        val host = cmdLine.stringValue(VES_HV_HOST)
-        val port = cmdLine.intValue(VES_HV_PORT)
+        val host = cmdLine.stringValue(VES_HV_HOST, DefaultValues.VES_HV_HOST)
+        val port = cmdLine.intValue(VES_HV_PORT, DefaultValues.VES_HV_PORT)
         val messagesAmount = cmdLine.longValue(MESSAGES_TO_SEND_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
         return ClientConfiguration(
                 host,
index bc7db86..3f872b5 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.exec.Promise
 import ratpack.handling.Chain
 import ratpack.handling.Context
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
+import reactor.core.publisher.Flux
+import reactor.core.scheduler.Schedulers
 import javax.json.Json
 import javax.json.JsonObject
 
@@ -35,30 +39,46 @@ import javax.json.JsonObject
  */
 class HttpServer(private val vesClient: VesHvClient) {
 
-    fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
-        RatpackServer.of {
-            it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
+    fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
+        RatpackServer.start { server ->
+            server.serverConfig(ServerConfig.embedded().port(port))
+                    .handlers(this::configureHandlers)
         }
-    }.doOnNext { it.start() }
+    }
 
 
     private fun configureHandlers(chain: Chain) {
-        chain.post("simulator") { ctx ->
-            ctx.request.body
-                    .map { Json.createReader(it.inputStream).readObject() }
-                    .map { extractMessageParameters(it) }
-                    .map { MessageFactory.INSTANCE.createMessageFlux(it) }
-                    .onError { handleException(it, ctx) }
-                    .then {
-                        vesClient.send(it)
-                        ctx.response
-                                .status(STATUS_OK)
-                                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
-                                        .add("response", "Request accepted")
-                                        .build()
-                                        .toString())
-                    }
-        }
+        chain
+                .post("simulator/sync") { ctx ->
+                    createMessageFlux(ctx)
+                            .map { vesClient.sendIo(it) }
+                            .map { it.unsafeRunSync() }
+                            .onError { handleException(it, ctx) }
+                            .then { sendAcceptedResponse(ctx) }
+                }
+                .post("simulator/async") { ctx ->
+                    createMessageFlux(ctx)
+                            .map { vesClient.sendRx(it) }
+                            .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
+                            .onError { handleException(it, ctx) }
+                            .then { sendAcceptedResponse(ctx) }
+                }
+    }
+
+    private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+        return ctx.request.body
+                .map { Json.createReader(it.inputStream).readObject() }
+                .map { extractMessageParameters(it) }
+                .map { MessageFactory.INSTANCE.createMessageFlux(it) }
+    }
+
+    private fun sendAcceptedResponse(ctx: Context) {
+        ctx.response
+                .status(STATUS_OK)
+                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+                        .add("response", "Request accepted")
+                        .build()
+                        .toString())
     }
 
     private fun handleException(t: Throwable, ctx: Context) {
index 78a72d9..be351b5 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
+import arrow.effects.IO
 import io.netty.buffer.Unpooled
 import io.netty.handler.ssl.ClientAuth
 import io.netty.handler.ssl.SslContext
 import io.netty.handler.ssl.SslContextBuilder
 import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
+import reactor.core.publisher.EmitterProcessor
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
 import reactor.ipc.netty.NettyOutbound
 import reactor.ipc.netty.tcp.TcpClient
 
@@ -50,28 +53,58 @@ class VesHvClient(private val configuration: ClientConfiguration) {
             }
             .build()
 
-    fun send(messages: Flux<WireFrame>) =
-            client
-                    .newHandler { _, out -> handler(out, messages) }
-                    .doOnError{logger.info("Failed to connect to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}")}
-                    .subscribe { logger.info("Connected to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}") }
+    fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+        sendRx(messages).block()
+    }
 
+    fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+        val complete = ReplayProcessor.create<Void>(1)
+        client
+                .newHandler { _, output -> handler(complete, messages, output) }
+                .doOnError {
+                    logger.info("Failed to connect to VesHvCollector on " +
+                            "${configuration.vesHost}:${configuration.vesPort}")
+                }
+                .subscribe {
+                    logger.info("Connected to VesHvCollector on " +
+                            "${configuration.vesHost}:${configuration.vesPort}")
+                }
+        return complete.then()
+    }
 
-    private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> {
+    private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+            Publisher<Void> {
+        val encoder = WireFrameEncoder(nettyOutbound.alloc())
+        val context = nettyOutbound.context()
 
+        context.onClose {
+            logger.info { "Connection to ${context.address()} has been closed" }
+        }
 
-        val encoder = WireFrameEncoder(nettyOutbound.alloc())
+        // TODO: Close channel after all messages have been sent
+        // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
+//        complete.subscribe {
+//            context.channel().disconnect().addListener {
+//                if (it.isSuccess)
+//                    logger.info { "Connection closed" }
+//                else
+//                    logger.warn("Failed to close the connection", it.cause())
+//            }
+//        }
 
         val frames = messages
                 .map(encoder::encode)
-                .concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
+                .window(MAX_BATCH_SIZE)
 
         return nettyOutbound
-                .options { it.flushOnEach() }
-                .send(frames)
-                .then { logger.info("Messages have been sent") }
+                .options { it.flushOnBoundary() }
+                .sendGroups(frames)
+                .send(Mono.just(Unpooled.EMPTY_BUFFER))
+                .then {
+                    logger.info("Messages have been sent")
+                    complete.onComplete()
+                }
+                .then()
     }
 
     private fun createSslContext(config: SecurityConfiguration): SslContext =
@@ -83,6 +116,7 @@ class VesHvClient(private val configuration: ClientConfiguration) {
                     .build()
 
     companion object {
+        private const val MAX_BATCH_SIZE = 128
         private val logger = Logger(VesHvClient::class)
     }
 }
index 08bc6a7..f222950 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf
 
+import arrow.core.Failure
+import arrow.core.Success
+import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import org.slf4j.LoggerFactory.getLogger
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
 
-private val logger = getLogger("Simulator :: main")
+private val logger = Logger("Simulator :: main")
+private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
 fun main(args: Array<String>) {
-    try {
-        val clientConfig = ArgBasedClientConfiguration().parse(args)
-        val vesClient = VesHvClient(clientConfig)
+    val httpServer = ArgBasedClientConfiguration().parse(args)
+            .map(::VesHvClient)
+            .map(::HttpServer)
 
-        HttpServer(vesClient)
-                .start()
-                .block()
-
-    } catch (e: WrongArgumentException) {
-        e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
-        System.exit(1)
-    } catch (e: Exception) {
-        logger.error(e.localizedMessage)
-        logger.debug("An error occurred when starting ves client", e)
-        System.exit(2)
+    when (httpServer) {
+        is Success -> httpServer.value.start().unsafeRunAsync {
+            it.fold(
+                    { ex ->
+                        logger.error("Failed to start a server", ex)
+                    },
+                    { srv ->
+                        logger.info("Started Simulator API server (listening on ${srv.bindHost}:${srv.bindPort})")
+                    }
+            )
+        }
+        is Failure -> httpServer.handleErrorsInMain(PROGRAM_NAME, logger)
     }
 }
-
index 6420d84..2746c0a 100644 (file)
@@ -19,6 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.main.config
 
+import arrow.core.Failure
+import arrow.core.Success
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
@@ -44,7 +46,13 @@ object ArgBasedClientConfigurationTest : Spek({
         cut = ArgBasedClientConfiguration()
     }
 
-    fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+    fun parse(vararg cmdLine: String): ClientConfiguration {
+        val result = cut.parse(cmdLine)
+        return when (result) {
+            is Success -> result.value
+            is Failure -> throw AssertionError("Parsing result should be present")
+        }
+    }
 
     describe("parsing arguments") {
         lateinit var result: ClientConfiguration
index a372fb2..1865731 100644 (file)
   ~ ============LICENSE_END=========================================================
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
 
-  <licenses>
-    <license>
-      <name>The Apache Software License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-    </license>
-  </licenses>
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
 
-  <parent>
-    <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
-    <artifactId>ves-hv-collector</artifactId>
-    <version>1.0.0-SNAPSHOT</version>
-    <relativePath>..</relativePath>
-  </parent>
+    <parent>
+        <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
+        <artifactId>ves-hv-collector</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
 
-  <artifactId>hv-collector-core</artifactId>
-  <description>VES HighVolume Collector :: Core</description>
+    <artifactId>hv-collector-core</artifactId>
+    <description>VES HighVolume Collector :: Core</description>
 
-  <properties>
-    <skipAnalysis>false</skipAnalysis>
-  </properties>
+    <properties>
+        <skipAnalysis>false</skipAnalysis>
+    </properties>
 
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>kotlin-maven-plugin</artifactId>
-        <groupId>org.jetbrains.kotlin</groupId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <groupId>org.apache.maven.plugins</groupId>
-      </plugin>
-      <plugin>
-        <groupId>org.jacoco</groupId>
-        <artifactId>jacoco-maven-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>kotlin-maven-plugin</artifactId>
+                <groupId>org.jetbrains.kotlin</groupId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <groupId>org.apache.maven.plugins</groupId>
+            </plugin>
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 
-  <dependencies>
-    <dependency>
-      <groupId>${project.parent.groupId}</groupId>
-      <artifactId>hv-collector-utils</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${project.parent.groupId}</groupId>
-      <artifactId>hv-collector-domain</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.jetbrains.kotlin</groupId>
-      <artifactId>kotlin-reflect</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.projectreactor</groupId>
-      <artifactId>reactor-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.projectreactor.addons</groupId>
-      <artifactId>reactor-extra</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.projectreactor.ipc</groupId>
-      <artifactId>reactor-netty</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.projectreactor.kafka</groupId>
-      <artifactId>reactor-kafka</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-tcnative-boringssl-static</artifactId>
-      <scope>runtime</scope>
-      <classifier>${os.detected.classifier}</classifier>
-    </dependency>
-    <dependency>
-      <groupId>javax.json</groupId>
-      <artifactId>javax.json-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.glassfish</groupId>
-      <artifactId>javax.json</artifactId>
-      <scope>runtime</scope>
-    </dependency>
+    <dependencies>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-utils</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-domain</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>compile</scope>
+        </dependency>
 
+        <dependency>
+            <groupId>org.jetbrains.kotlin</groupId>
+            <artifactId>kotlin-reflect</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-effects</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-extra</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.ipc</groupId>
+            <artifactId>reactor-netty</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.kafka</groupId>
+            <artifactId>reactor-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+            <scope>runtime</scope>
+            <classifier>${os.detected.classifier}</classifier>
+        </dependency>
+        <dependency>
+            <groupId>javax.json</groupId>
+            <artifactId>javax.json-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <scope>runtime</scope>
+        </dependency>
 
-    <dependency>
-      <groupId>com.nhaarman</groupId>
-      <artifactId>mockito-kotlin</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.assertj</groupId>
-      <artifactId>assertj-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.jetbrains.kotlin</groupId>
-      <artifactId>kotlin-test</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.jetbrains.spek</groupId>
-      <artifactId>spek-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.jetbrains.spek</groupId>
-      <artifactId>spek-junit-platform-engine</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.projectreactor</groupId>
-      <artifactId>reactor-test</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>ch.qos.logback</groupId>
-      <artifactId>logback-classic</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
+
+        <dependency>
+            <groupId>com.nhaarman</groupId>
+            <artifactId>mockito-kotlin</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.kotlin</groupId>
+            <artifactId>kotlin-test</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.spek</groupId>
+            <artifactId>spek-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.spek</groupId>
+            <artifactId>spek-junit-platform-engine</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
 </project>
index ed686fe..d615848 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
+import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -32,9 +33,10 @@ interface Collector {
 typealias CollectorProvider = () -> Collector
 
 interface Server {
-    fun start(): Mono<Void>
+    fun start(): IO<ServerHandle>
 }
 
-interface ServerFactory {
-    fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server
+abstract class ServerHandle(val host: String, val port: Int) {
+    abstract fun shutdown(): IO<Unit>
+    abstract fun await(): IO<Unit>
 }
index f3f0a89..cee658b 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import arrow.core.Option
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.Routing
 import org.onap.dcae.collectors.veshv.model.VesMessage
 
 class Router(private val routing: Routing) {
-    fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message)
+    fun findDestination(message: VesMessage): Option<RoutedMessage> =
+            routing.routeFor(message.header).map { it(message) }
 }
index 222eaef..033095a 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import arrow.core.Option
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
@@ -67,7 +68,10 @@ internal class VesHvCollector(
         wireChunkDecoder.release()
     }
 
-    private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
+    private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
+            mapper(input).fold(
+                    { Mono.empty() },
+                    { Mono.just(it) })
 
     companion object {
         val logger = Logger(VesHvCollector::class)
index a5c4104..5f4bf35 100644 (file)
@@ -23,7 +23,6 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import java.util.concurrent.atomic.AtomicLong
@@ -36,7 +35,6 @@ internal class LoggingSinkProvider : SinkProvider {
 
     override fun invoke(config: CollectorConfiguration): Sink {
         return object : Sink {
-            private val logger = Logger(LoggingSinkProvider::class)
             private val totalMessages = AtomicLong()
             private val totalBytes = AtomicLong()
 
@@ -59,5 +57,6 @@ internal class LoggingSinkProvider : SinkProvider {
 
     companion object {
         const val INFO_LOGGING_FREQ = 100_000
+        private val logger = Logger(LoggingSinkProvider::class)
     }
 }
index 0a548a5..f8fa72a 100644 (file)
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import reactor.core.publisher.Flux
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderRecord
 import reactor.kafka.sender.SenderResult
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
 internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+    private val sentMessages = AtomicLong(0)
 
     override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
         val records = messages.map(this::vesToKafkaRecord)
-        return sender.send(records)
+        val result = sender.send(records)
                 .doOnNext(::logException)
                 .filter(::isSuccessful)
                 .map { it.correlationMetadata() }
+
+        return if (logger.traceEnabled) {
+            result.doOnNext(::logSentMessage)
+        } else {
+            result
+        }
     }
 
     private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -59,7 +70,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
         }
     }
 
-    private fun isSuccessful(senderResult: SenderResult<out Any>)  = senderResult.exception() == null
+    private fun logSentMessage(sentMsg: RoutedMessage) {
+        logger.trace {
+            val msgNum = sentMessages.incrementAndGet()
+            "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
+        }
+    }
+
+    private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
 
     companion object {
         val logger = Logger(KafkaSink::class)
index 9753d9e..4e9932c 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-class ProtobufSerializer :Serializer<MessageLite> {
+class ProtobufSerializer : Serializer<MessageLite> {
     override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
         // no configuration
     }
index 65b3b29..0426ceb 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
+import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.boundary.ServerHandle
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
@@ -28,7 +30,9 @@ import reactor.core.publisher.Mono
 import reactor.ipc.netty.NettyInbound
 import reactor.ipc.netty.NettyOutbound
 import reactor.ipc.netty.options.ServerOptions
+import reactor.ipc.netty.tcp.BlockingNettyContext
 import reactor.ipc.netty.tcp.TcpServer
+import java.time.Duration
 import java.util.function.BiFunction
 
 /**
@@ -39,17 +43,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                               private val sslContextFactory: SslContextFactory,
                               private val collectorProvider: CollectorProvider) : Server {
 
-    override fun start(): Mono<Void> {
-        logger.info { "Listening on port ${serverConfig.port}" }
-        return Mono.defer {
-            val nettyContext = TcpServer.builder()
-                    .options(this::configureServer)
-                    .build()
-                    .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u ->
-                        handleConnection(t, u)
-                    })
-            Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() }
-        }
+    override fun start(): IO<ServerHandle> = IO {
+        val ctx = TcpServer.builder()
+                .options(this::configureServer)
+                .build()
+                .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
+                    handleConnection(input)
+                })
+        NettyServerHandle(ctx)
     }
 
     private fun configureServer(opts: ServerOptions.Builder<*>) {
@@ -57,20 +58,50 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
         opts.sslContext(sslContextFactory.createSslContext(serverConfig.securityConfiguration))
     }
 
-    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
-        logger.debug("Got connection")
-        nettyOutbound.alloc()
+    private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
+        logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+
+        val dataStream = nettyInbound
+                .configureIdleTimeout(serverConfig.idleTimeout)
+                .logConnectionClosed()
+                .receive()
+                .retain()
 
-        val sendHello = nettyOutbound
-                .options { it.flushOnEach() }
-                .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
-                .then()
+        return collectorProvider()
+                .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+    }
 
-        val handleIncomingMessages = collectorProvider()
-                .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain())
+    private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+        onReadIdle(timeout.toMillis()) {
+            logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
+            context().channel().close().addListener {
 
-        return sendHello.then(handleIncomingMessages)
+                if (it.isSuccess)
+                    logger.debug { "Client disconnected because of idle timeout" }
+                else
+                    logger.warn("Channel close failed", it.cause())
+            }
+        }
+        return this
+    }
+
+    private fun NettyInbound.logConnectionClosed(): NettyInbound {
+        context().onClose {
+            logger.info("Connection from ${remoteAddress()} has been closed")
+        }
+        return this
     }
+
+    private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+        override fun shutdown() = IO {
+            ctx.shutdown()
+        }
+
+        override fun await() = IO<Unit> {
+            ctx.context.channel().closeFuture().sync()
+        }
+    }
+
     companion object {
         private val logger = Logger(NettyTcpServer::class)
     }
index 34a8b92..b788f51 100644 (file)
@@ -56,7 +56,7 @@ internal class StreamBufferEmitter(
             else -> {
                 streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
                 sink.onDispose {
-                    logger.debug("Disposing read components")
+                    logger.trace { "Disposing read components" }
                     streamBuffer.discardReadComponents()
                 }
                 sink.onRequest { requestedFrameCount ->
index a576dc6..abebff3 100644 (file)
@@ -84,7 +84,7 @@ internal class WireFrameSink(
             try {
                 decoder.decodeFirst(streamBuffer)
             } catch (ex: MissingWireFrameBytesException) {
-                logger.debug { "${ex.message} - waiting for more data" }
+                logger.trace { "${ex.message} - waiting for more data" }
                 null
             }
 
index 8d01c07..67a7d6f 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.model
 
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -28,4 +29,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
 data class ServerConfiguration(
         val port: Int,
         val configurationUrl: String,
-        val securityConfiguration: SecurityConfiguration)
+        val securityConfiguration: SecurityConfiguration,
+        val idleTimeout: Duration,
+        val dummyMode: Boolean = false)
index bc03058..e9cd5f3 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.model
 
+import arrow.core.Option
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 
 data class Routing(val routes: List<Route>) {
 
-    fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) }
+    fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+            Option.fromNullable(routes.find { it.applies(commonHeader) })
 }
 
 data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
index c852f5f..599a9d4 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import arrow.core.None
+import arrow.core.Some
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -61,15 +64,15 @@ object RouterTest : Spek({
             }
 
             it("should be routed to proper partition") {
-                assertThat(result?.partition).isEqualTo(2)
+                assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
             }
 
             it("should be routed to proper topic") {
-                assertThat(result?.topic).isEqualTo("ves_rtpm")
+                assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
             }
 
             it("should be routed with a given message") {
-                assertThat(result?.message).isSameAs(message)
+                assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
             }
         }
 
@@ -82,15 +85,15 @@ object RouterTest : Spek({
             }
 
             it("should be routed to proper partition") {
-                assertThat(result?.partition).isEqualTo(0)
+                assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
             }
 
             it("should be routed to proper topic") {
-                assertThat(result?.topic).isEqualTo("ves_trace")
+                assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
             }
 
             it("should be routed with a given message") {
-                assertThat(result?.message).isSameAs(message)
+                assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
             }
         }
 
@@ -99,7 +102,7 @@ object RouterTest : Spek({
             val result = cut.findDestination(message)
 
             it("should not have route available") {
-                assertThat(result).isNull()
+                assertThat(result).isEqualTo(None)
             }
         }
     }
index a449078..68b562d 100644 (file)
@@ -5,11 +5,11 @@ LABEL license.name="The Apache Software License, Version 2.0"
 LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
 LABEL maintainer="Nokia Wroclaw ONAP Team"
 
-EXPOSE 8080
+EXPOSE 5000
 
 WORKDIR /opt/ves-hv-dcae-app-simulator
 ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"]
-CMD ["--kafka-bootstrap-servers", "kafka:9092", "--kafka-topics", "ves_hvRanMeas", "--listen-port", "8080"]
+CMD ["--kafka-bootstrap-servers", "kafka:9092", "--kafka-topics", "ves_hvRanMeas", "--listen-port", "5000"]
 COPY target/libs/external/* ./
 COPY target/libs/internal/* ./
 COPY target/hv-collector-dcae-app-simulator-*.jar ./
index 046f5ed..a2f92e8 100644 (file)
             <artifactId>hv-collector-utils</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-effects</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.ratpack</groupId>
             <artifactId>ratpack-core</artifactId>
index 3f53930..27edde9 100644 (file)
@@ -24,12 +24,14 @@ import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DefaultValues.API_PORT
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
 
 internal object DefaultValues {
     const val API_PORT = 8080
+    const val KAFKA_SERVERS = "kafka:9092"
+    const val KAFKA_TOPICS = "ves_hvRanMeas"
 }
 
 class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
@@ -41,8 +43,8 @@ class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfigur
 
     override fun getConfiguration(cmdLine: CommandLine): DcaeAppSimConfiguration {
         val port = cmdLine.intValue(LISTEN_PORT, API_PORT)
-        val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS)
-        val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS).split(",").toSet()
+        val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS, DefaultValues.KAFKA_SERVERS)
+        val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS, DefaultValues.KAFKA_TOPICS).split(",").toSet()
         return DcaeAppSimConfiguration(
                 port,
                 kafkaBootstrapServers,
index f7703b8..d53609c 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
 
+import arrow.effects.IO
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
 import reactor.kafka.receiver.KafkaReceiver
 import reactor.kafka.receiver.ReceiverOptions
 import java.util.*
@@ -33,10 +33,10 @@ import java.util.*
  */
 class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
 
-    fun start(): Mono<Consumer> = Mono.create { sink ->
+    fun start(): IO<Consumer> = IO {
         val consumer = Consumer()
         receiver.receive().subscribe(consumer::update)
-        sink.success(consumer)
+        consumer
     }
 
     companion object {
index 5f0fe7f..6616953 100644 (file)
@@ -19,7 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
 
-import reactor.core.publisher.Mono
+import arrow.core.Option
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.kafka.receiver.ReceiverRecord
 
 /**
@@ -27,10 +29,11 @@ import reactor.kafka.receiver.ReceiverRecord
  * @since June 2018
  */
 
-class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?)
+class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
 
 interface ConsumerStateProvider {
-    fun currentState(): Mono<ConsumerState>
+    fun currentState(): ConsumerState
+    fun reset(): IO<Unit>
 }
 
 class Consumer : ConsumerStateProvider {
@@ -38,18 +41,28 @@ class Consumer : ConsumerStateProvider {
     private var lastKey: ByteArray? = null
     private var lastValue: ByteArray? = null
 
-    override fun currentState(): Mono<ConsumerState> = Mono.create { sink ->
-        val state = synchronized(this) {
-            ConsumerState(msgCount, lastKey, lastValue)
+    override fun currentState() =
+            ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
+
+    override fun reset() = IO {
+        synchronized(this) {
+            msgCount = 0
+            lastKey = null
+            lastValue = null
         }
-        sink.success(state)
     }
 
     fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+        logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
+
         synchronized(this) {
             msgCount++
             lastKey = record.key()
             lastValue = record.value()
         }
     }
+
+    companion object {
+        private val logger = Logger(Consumer::class)
+    }
 }
index c037af3..f7d44ed 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp
 
+import arrow.core.Failure
+import arrow.core.Success
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.slf4j.LoggerFactory
 
 private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main"))
 
 fun main(args: Array<String>) {
+    logger.info("Starting DCAE APP simulator")
 
-    try {
-        logger.info("Starting DCAE APP simulator")
-        val simulatorConfig = ArgBasedDcaeAppSimConfiguration().parse(args)
+    val config = ArgBasedDcaeAppSimConfiguration().parse(args)
+    when (config) {
+        is Success -> startApp(config.value).unsafeRunSync()
+        is Failure -> config.handleErrorsInMain("", logger)
+    }
+}
 
-        KafkaSource.create(simulatorConfig.kafkaBootstrapServers, simulatorConfig.kafkaTopics)
+private fun startApp(config: DcaeAppSimConfiguration) =
+        KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics)
                 .start()
                 .map(::ApiServer)
-                .flatMap { it.start(simulatorConfig.apiPort) }
-                .block()
-    } catch (e: WrongArgumentException) {
-        e.printHelp("java org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt")
-        System.exit(1)
-    } catch (e: Exception) {
-        logger.error(e.localizedMessage)
-        logger.debug("An error occurred when starting ves dcea app simulator", e)
-        System.exit(2)
-    }
-}
+                .flatMap { it.start(config.apiPort) }
index fcb8e13..2fa8abe 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
 
+import arrow.core.Try
+import arrow.core.getOrElse
+import arrow.effects.IO
+import com.google.protobuf.MessageOrBuilder
 import com.google.protobuf.util.JsonFormat
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
 import org.onap.ves.VesEventV5.VesEvent
 import ratpack.handling.Chain
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -34,41 +38,71 @@ import reactor.core.publisher.Mono
 class ApiServer(private val consumerState: ConsumerStateProvider) {
     private val jsonPrinter = JsonFormat.printer()
 
-    fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
-        RatpackServer.of { server ->
+    fun start(port: Int): IO<RatpackServer> = IO {
+        RatpackServer.start { server ->
             server.serverConfig(ServerConfig.embedded().port(port))
                     .handlers(this::setupHandlers)
         }
-    }.doOnNext(RatpackServer::start)
+    }
 
     private fun setupHandlers(chain: Chain) {
         chain
                 .get("messages/count") { ctx ->
                     ctx.response.contentType(CONTENT_TEXT)
-                    consumerState.currentState()
-                            .map { it.msgCount.toString() }
-                            .subscribe(ctx.response::send)
+                    val state = consumerState.currentState()
+                    ctx.response.send(state.msgCount.toString())
                 }
 
                 .get("messages/last/key") { ctx ->
                     ctx.response.contentType(CONTENT_JSON)
-                    consumerState.currentState()
-                            .map { it.lastKey }
-                            .map { VesEvent.CommonEventHeader.parseFrom(it) }
-                            .map { jsonPrinter.print(it) }
-                            .subscribe(ctx.response::send)
+                    val state = consumerState.currentState()
+                    val resp = state.lastKey
+                            .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
+                            .map(this::protobufToJson)
+                            .getOrElse { "null" }
+                    ctx.response.send(resp)
                 }
 
                 .get("messages/last/value") { ctx ->
                     ctx.response.contentType(CONTENT_JSON)
-                    consumerState.currentState()
-                            .map { it.lastValue }
-                            .map { VesEvent.parseFrom(it) }
-                            .map { jsonPrinter.print(it) }
-                            .subscribe(ctx.response::send)
+                    val state = consumerState.currentState()
+                    val resp = state.lastValue
+                            .map { Try { VesEvent.parseFrom(it) } }
+                            .map(this::protobufToJson)
+                            .getOrElse { "null" }
+                    ctx.response.send(resp)
+                }
+
+                .get("messages/last/hvRanMeasFields") { ctx ->
+                    ctx.response.contentType(CONTENT_JSON)
+                    val state = consumerState.currentState()
+                    val resp = state.lastValue
+                            .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
+                            .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
+                            .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
+                            .map(this::protobufToJson)
+                            .getOrElse { "null" }
+                    ctx.response.send(resp)
+                }
+
+                .delete("messages") { ctx ->
+                    ctx.response.contentType(CONTENT_TEXT)
+                    consumerState.reset()
+                            .unsafeRunAsync {
+                                it.fold(
+                                        { ctx.response.send("NOK") },
+                                        { ctx.response.send("OK") }
+                                )
+                            }
                 }
     }
 
+    private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
+            parseResult.fold(
+                    { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
+                    { jsonPrinter.print(it) })
+
+
     companion object {
         private const val CONTENT_TEXT = "text/plain"
         private const val CONTENT_JSON = "application/json"
index 817df8e..d99de17 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
 
+import arrow.core.Failure
+import arrow.core.Success
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import kotlin.test.assertFailsWith
 
 
 internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
@@ -38,7 +39,21 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
         cut = ArgBasedDcaeAppSimConfiguration()
     }
 
-    fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+    fun parseExpectingSuccess(vararg cmdLine: String): DcaeAppSimConfiguration {
+        val result = cut.parse(cmdLine)
+        return when (result) {
+            is Success -> result.value
+            is Failure -> throw AssertionError("Parsing result should be present")
+        }
+    }
+
+    fun parseExpectingFailure(vararg cmdLine: String): Throwable {
+        val result = cut.parse(cmdLine)
+        return when (result) {
+            is Success -> throw AssertionError("parsing should have failed")
+            is Failure -> result.exception
+        }
+    }
 
     describe("parsing arguments") {
         lateinit var result: DcaeAppSimConfiguration
@@ -46,7 +61,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
         given("all parameters are present in the long form") {
 
             beforeEachTest {
-                result = parse("--listen-port", "6969",
+                result = parseExpectingSuccess("--listen-port", "6969",
                         "--kafka-bootstrap-servers", kafkaBootstrapServers,
                         "--kafka-topics", kafkaTopics
                 )
@@ -71,7 +86,9 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
         given("some parameters are present in the short form") {
 
             beforeEachTest {
-                result = parse("-p", "666", "--kafka-bootstrap-servers", kafkaBootstrapServers, "-f", kafkaTopics)
+                result = parseExpectingSuccess("-p", "666",
+                        "--kafka-bootstrap-servers", kafkaBootstrapServers,
+                        "-f", kafkaTopics)
             }
 
             it("should set proper port") {
@@ -92,7 +109,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
         given("all optional parameters are absent") {
 
             beforeEachTest {
-                result = parse("-s", kafkaBootstrapServers, "-f", kafkaTopics)
+                result = parseExpectingSuccess("-s", kafkaBootstrapServers, "-f", kafkaTopics)
             }
 
             it("should set default port") {
@@ -100,21 +117,20 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
             }
         }
 
-
         describe("required parameter is absent") {
             given("kafka topics are missing") {
                 it("should throw exception") {
-                    assertFailsWith<WrongArgumentException> { parse("-s", kafkaBootstrapServers) }
+                    assertThat(parseExpectingFailure("-s", kafkaBootstrapServers))
+                            .isInstanceOf(WrongArgumentException::class.java)
                 }
             }
 
             given("kafka bootstrap servers are missing") {
                 it("should throw exception") {
-                    assertFailsWith<WrongArgumentException> { parse("-f", kafkaTopics) }
+                    assertThat(parseExpectingFailure("-f", kafkaTopics))
+                            .isInstanceOf(WrongArgumentException::class.java)
                 }
             }
         }
     }
-
-
 })
\ No newline at end of file
index de9004a..c11510a 100644 (file)
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-test</artifactId>
-            <version>${kotlin.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
index 59b91d7..f3e97be 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main
 
-import org.apache.commons.cli.DefaultParser
 import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.time.Duration
 
 internal object DefaultValues {
     const val PORT = 6061
@@ -36,6 +39,7 @@ internal object DefaultValues {
     const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
     const val CERT_FILE = "/etc/ves-hv/server.crt"
     const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
+    const val IDLE_TIMEOUT_SEC = 60L
 }
 
 internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
@@ -44,14 +48,23 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
             CONSUL_CONFIG_URL,
             PRIVATE_KEY_FILE,
             CERT_FILE,
-            TRUST_CERT_FILE
+            TRUST_CERT_FILE,
+            IDLE_TIMEOUT_SEC,
+            DUMMY_MODE
     )
 
     override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
         val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
         val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
+        val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+        val dummyMode = cmdLine.hasOption(DUMMY_MODE)
         val security = createSecurityConfiguration(cmdLine)
-        return ServerConfiguration(port, configUrl, security)
+        return ServerConfiguration(
+                port = port,
+                configurationUrl = configUrl,
+                securityConfiguration = security,
+                idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+                dummyMode = dummyMode)
     }
 
     private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
index 1f2686b..074a75e 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main
 
+import arrow.core.flatMap
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
 import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.slf4j.LoggerFactory
-import kotlin.system.exitProcess
 
-private val logger = LoggerFactory.getLogger("main")
+private val logger = Logger("org.onap.dcae.collectors.veshv.main")
+private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
 
 fun main(args: Array<String>) {
-    try {
-        val serverConfiguration = ArgBasedServerConfiguration().parse(args)
-
-        val collectorProvider = CollectorFactory(
-                resolveConfigurationProvider(serverConfiguration),
-                AdapterFactory.kafkaSink(),
-                MicrometerMetrics()
-        ).createVesHvCollectorProvider()
-        ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
-    } catch (ex: WrongArgumentException) {
-        ex.printMessage()
-        ex.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
-        exitProcess(1)
-    }
+    ArgBasedServerConfiguration().parse(args)
+            .toEither()
+            .map(::createServer)
+            .map(Server::start)
+            .flatMap { it.attempt().unsafeRunSync() }
+            .fold(
+                    { ex ->
+                        handleErrorsInMain(ex, PROGRAM_NAME, logger)
+                    },
+                    { handle ->
+                        logger.info("Server started. Listening on ${handle.host}:${handle.port}")
+                        handle.await().unsafeRunSync()
+                    }
+            )
 }
 
+private fun createServer(config: ServerConfiguration): Server {
+    val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
+    val collectorProvider = CollectorFactory(
+            resolveConfigurationProvider(config),
+            sink,
+            MicrometerMetrics()
+    ).createVesHvCollectorProvider()
+
+    return ServerFactory.createNettyTcpServer(config, collectorProvider)
+}
 
 private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider {
 
index 48da3b1..5127e7e 100644 (file)
       </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
+  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+  <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
+  <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
+  <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
   <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
 
   <root level="INFO">
index 923f9d5..4c2425b 100644 (file)
@@ -19,6 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.main
 
+import arrow.core.Failure
+import arrow.core.Success
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
@@ -44,7 +46,13 @@ object ArgBasedServerConfigurationTest : Spek({
         cut = ArgBasedServerConfiguration()
     }
 
-    fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+    fun parse(vararg cmdLine: String): ServerConfiguration {
+        val result = cut.parse(cmdLine)
+        return when (result) {
+            is Success -> result.value
+            is Failure -> throw AssertionError("Parsing result should be present")
+        }
+    }
 
     describe("parsing arguments") {
         given("all parameters are present in the long form") {
index 8a8a1d8..3c48280 100644 (file)
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-reflect</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-instances-data</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
index 968c340..34c0e65 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.utils.commandline
 
+import arrow.core.Option
+import arrow.core.Try
+import arrow.core.getOrElse
+import arrow.core.recoverWith
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.CommandLineParser
 import org.apache.commons.cli.Options
 import java.io.File
+import java.nio.file.Path
 import java.nio.file.Paths
 
 abstract class ArgBasedConfiguration<T>(val parser: CommandLineParser) {
     abstract val cmdLineOptionsList: List<CommandLineOption>
 
-    fun parse(args: Array<out String>): T {
+    fun parse(args: Array<out String>): Try<T> {
         val commandLineOptions = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption)
-        try {
-            val cmdLine = parser.parse(commandLineOptions, args)
-            return getConfiguration(cmdLine)
-        } catch (ex: Exception) {
-            throw WrongArgumentException(ex, commandLineOptions)
-        }
+        return Try {
+            parser.parse(commandLineOptions, args)
+        }.recoverWith { ex ->
+            Try.raise<CommandLine>(WrongArgumentException(ex, commandLineOptions))
+        }.map (this::getConfiguration)
     }
 
     protected abstract fun getConfiguration(cmdLine: CommandLine): T
 
-    protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Int =
-            getOptionValue(cmdLineOpt.option.opt).toInt()
-
     protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int =
-            getOptionValue(cmdLineOpt.option.opt)?.toInt() ?: default
-
-    protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Long =
-            getOptionValue(cmdLineOpt.option.opt).toLong()
+            intValue(cmdLineOpt).getOrElse { default }
 
     protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
-            getOptionValue(cmdLineOpt.option.opt)?.toLong() ?: default
+            longValue(cmdLineOpt).getOrElse { default }
 
-    protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): String =
-            getOptionValue(cmdLineOpt.option.opt)
+    protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> =
+            optionValue(cmdLineOpt)
 
     protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String =
-            getOptionValue(cmdLineOpt.option.opt) ?: default
+            optionValue(cmdLineOpt).getOrElse { default }
+
+    protected fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
+            this.hasOption(cmdLineOpt.option.opt)
+
+    protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI())
+
+    private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption): Option<String> =
+            Option.fromNullable(getOptionValue(cmdLineOpt.option.opt))
+
+    private fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> =
+            optionValue(cmdLineOpt).map(String::toInt)
 
-    protected fun stringPathToPath(path: String) = Paths.get(File(path).toURI())
+    private fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
+            optionValue(cmdLineOpt).map(String::toLong)
 }
index 9d1f7aa..942ca31 100644 (file)
@@ -87,4 +87,16 @@ enum class CommandLineOption(val option: Option) {
             .desc("File with trusted certificate bundle for trusting connections")
             .build()
     ),
+    IDLE_TIMEOUT_SEC(Option.builder("i")
+            .longOpt("idle-timeout-sec")
+            .hasArg()
+            .desc("""Idle timeout for remote hosts. After given time without any data exchange the
+                |connection might be closed.""".trimMargin())
+            .build()
+    ),
+    DUMMY_MODE(Option.builder("d")
+            .longOpt("dummy")
+            .desc("If present will start in dummy mode (dummy external services)")
+            .build()
+    ),
 }
index 5f6a86a..083d579 100644 (file)
@@ -23,7 +23,14 @@ import org.apache.commons.cli.HelpFormatter
 import org.apache.commons.cli.Options
 
 
-class WrongArgumentException(parent: Exception, private val options: Options) : Exception(parent.message, parent) {
+class WrongArgumentException(
+        message: String,
+        private val options: Options,
+        parent: Throwable? = null
+) : Exception(message, parent) {
+
+    constructor(par: Throwable, options: Options) : this(par.message ?: "", options, par)
+
     fun printMessage() {
         println(message)
     }
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
new file mode 100644 (file)
index 0000000..23bf165
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.commandline
+
+import arrow.core.Failure
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import kotlin.system.exitProcess
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+
+fun handleErrorsInMain(ex: Throwable, programName: String, logger: Logger) {
+    when (ex) {
+        is WrongArgumentException -> {
+            ex.printMessage()
+            ex.printHelp(programName)
+            exitProcess(1)
+        }
+
+        else -> {
+            logger.error(ex.localizedMessage)
+            logger.debug("An error occurred when starting VES HV Collector", ex)
+            System.exit(2)
+        }
+    }
+}
+
+
+fun <A> Failure<A>.handleErrorsInMain(programName: String, logger: Logger) {
+    handleErrorsInMain(this.exception, programName, logger)
+}
index f614d42..536fe93 100644 (file)
@@ -24,11 +24,15 @@ import kotlin.reflect.KClass
 
 class Logger(val logger: org.slf4j.Logger) {
     constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java))
+    constructor(name: String) : this(LoggerFactory.getLogger(name))
 
     //
     // TRACE
     //
 
+    val traceEnabled: Boolean
+        get() = logger.isTraceEnabled
+
     fun trace(messageProvider: () -> String) {
         if (logger.isTraceEnabled) {
             logger.trace(messageProvider())
diff --git a/pom.xml b/pom.xml
index f478df3..adc53a7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,8 @@
     </modules>
 
     <properties>
-        <kotlin.version>1.2.41</kotlin.version>
+        <kotlin.version>1.2.50</kotlin.version>
+        <arrow.version>0.7.2</arrow.version>
         <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
         <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
 
             <dependency>
                 <groupId>io.arrow-kt</groupId>
                 <artifactId>arrow-core</artifactId>
-                <version>0.7.2</version>
+                <version>${arrow.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.jetbrains.kotlin</groupId>
+                        <artifactId>kotlin-stdlib</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.jetbrains.kotlin</groupId>
+                        <artifactId>kotlin-stdlib-jdk7</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>io.arrow-kt</groupId>
                 <artifactId>arrow-syntax</artifactId>
-                <version>0.7.2</version>
+                <version>${arrow.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.arrow-kt</groupId>
+                <artifactId>arrow-instances-core</artifactId>
+                <version>${arrow.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.arrow-kt</groupId>
+                <artifactId>arrow-instances-data</artifactId>
+                <version>${arrow.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.arrow-kt</groupId>
+                <artifactId>arrow-effects</artifactId>
+                <version>${arrow.version}</version>
             </dependency>
             <dependency>
                 <groupId>ch.qos.logback</groupId>