Improve coverage of xNF simulator 53/63453/13
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 29 Aug 2018 11:24:59 +0000 (13:24 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 6 Sep 2018 05:43:30 +0000 (07:43 +0200)
Also refactor to make it possible.

Change-Id: I6da6d3f33e57c524a7e353ecebd3e045d8ceed2a
Issue-ID: DCAEGEN2-739
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
38 files changed:
docker-compose.yml
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt [moved from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt with 53% similarity]
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
hv-collector-main/src/main/resources/logback.xml
hv-collector-test-utils/pom.xml
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt [new file with mode: 0644]
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt [moved from hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt with 50% similarity]
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt [moved from hv-collector-test-utils/src/main/kotlin/configurations.kt with 100% similarity]
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt [moved from hv-collector-test-utils/src/main/kotlin/messages.kt with 100% similarity]
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt [moved from hv-collector-test-utils/src/main/kotlin/vesEvents.kt with 100% similarity]
hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker [moved from hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker with 100% similarity]
hv-collector-utils/pom.xml
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt [new file with mode: 0644]
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt [new file with mode: 0644]
hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt [new file with mode: 0644]
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
hv-collector-xnf-simulator/pom.xml
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt [deleted file]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt [new file with mode: 0644]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt [new file with mode: 0644]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt [moved from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt with 98% similarity]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt [moved from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt with 92% similarity]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt [new file with mode: 0644]
hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt [new file with mode: 0644]
hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt [new file with mode: 0644]
hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
pom.xml

index 33aedec..f9f52b4 100644 (file)
@@ -28,7 +28,7 @@ services:
       command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
 
   ves-hv-collector:
-    image: nexus3.onap.org:10003/onap/ves-hv-collector:latest
+    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
 #    build:
 #      context: hv-collector-main
 #      dockerfile: Dockerfile
@@ -51,7 +51,7 @@ services:
       - ./ssl/:/etc/ves-hv/
 
   xnf-simulator:
-    image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator
+    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
 #    build:
 #      context: hv-collector-xnf-simulator
 #      dockerfile: Dockerfile
@@ -64,7 +64,7 @@ services:
       - ./ssl/:/etc/ves-hv/
 
   dcae-app-simulator:
-    image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator
+    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
 #    build:
 #      context: hv-collector-dcae-app-simulator
 #      dockerfile: Dockerfile
index 1e22d4c..ba29844 100644 (file)
@@ -149,7 +149,7 @@ object PerformanceSpecification : Spek({
 
             val outputDigest = digest.digest()
 
-            assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
+            assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
             assertThat(outputDigest).isEqualTo(inputDigest)
 
         }
index 239f710..354edae 100644 (file)
@@ -54,10 +54,14 @@ class MessageStreamValidation(
         val expectations = Json.createReader(input).readArray()
         val messageParams = messageParametersParser.parse(expectations)
 
-        if (messageParams.isEmpty())
-            throw IllegalArgumentException("Message param list cannot be empty")
-
-        return messageParams
+        return messageParams.fold(
+                { throw IllegalArgumentException("Parsing error: " + it.message) },
+                {
+                    if (it.isEmpty())
+                        throw IllegalArgumentException("Message param list cannot be empty")
+                    it
+                }
+        )
     }
 
     private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
-import arrow.core.Left
-import arrow.core.Right
 import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.monad
-import arrow.typeclasses.binding
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import ratpack.exec.Promise
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendOrError
 import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.http.Response
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
 
@@ -38,7 +34,21 @@ import ratpack.server.ServerConfig
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class ApiServer(private val simulator: DcaeAppSimulator) {
+class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
+    private val responseValid by lazy {
+        Responses.statusResponse(
+                name = "valid",
+                message = "validation succeeded"
+        )
+    }
+
+    private val responseInvalid by lazy {
+        Responses.statusResponse(
+                name = "invalid",
+                message = "validation failed",
+                httpStatus = HttpStatus.BAD_REQUEST
+        )
+    }
 
 
     fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
@@ -52,10 +62,10 @@ class ApiServer(private val simulator: DcaeAppSimulator) {
     private fun setupHandlers(chain: Chain) {
         chain
                 .put("configuration/topics") { ctx ->
-                    val operation = ctx.bodyIo().flatMap { body ->
-                        simulator.listenToTopics(body.text)
+                    ctx.request.body.then { body ->
+                        val operation = simulator.listenToTopics(body.text)
+                        ctx.response.sendOrError(operation)
                     }
-                    ctx.response.sendOrError(operation)
 
                 }
                 .delete("messages") { ctx ->
@@ -64,7 +74,7 @@ class ApiServer(private val simulator: DcaeAppSimulator) {
                 }
                 .get("messages/all/count") { ctx ->
                     simulator.state().fold(
-                            { ctx.response.status(STATUS_NOT_FOUND) },
+                            { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) },
                             {
                                 ctx.response
                                         .contentType(CONTENT_TEXT)
@@ -72,58 +82,20 @@ class ApiServer(private val simulator: DcaeAppSimulator) {
                             })
                 }
                 .post("messages/all/validate") { ctx ->
-                    val responseStatus = IO.monad().binding {
-                        val body = ctx.bodyIo().bind()
-                        val isValid = simulator.validate(body.inputStream).bind()
-                        if (isValid)
-                            STATUS_OK
-                        else
-                            STATUS_BAD_REQUEST
-                    }.fix()
-
-                    ctx.response.sendStatusOrError(responseStatus)
+                    ctx.request.body.then { body ->
+                        val response = simulator.validate(body.inputStream)
+                                .map { isValid ->
+                                    if (isValid) responseValid else responseInvalid
+                                }
+                        ctx.response.sendAndHandleErrors(response)
+                    }
                 }
                 .get("healthcheck") { ctx ->
-                    ctx.response.status(STATUS_OK).send()
+                    ctx.response.status(HttpConstants.STATUS_OK).send()
                 }
     }
 
-    private fun Context.bodyIo() = request.body.asIo()
-
-    private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult ->
-        onError {
-            emitResult(Left(it))
-        }.then { result ->
-            emitResult(Right(result))
-        }
-    }
-
-    private fun Response.sendOrError(responseStatus: IO<Unit>) {
-        sendStatusOrError(responseStatus.map { STATUS_OK })
-    }
-
-    private fun Response.sendStatusOrError(responseStatus: IO<Int>) {
-        responseStatus.unsafeRunAsync { cb ->
-            cb.fold(
-                    { err ->
-                        logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err)
-                        status(ApiServer.STATUS_INTERNAL_SERVER_ERROR)
-                                .send(CONTENT_TEXT, err.message)
-                    },
-                    {
-                        status(it).send()
-                    }
-            )
-        }
-    }
-
     companion object {
-        private val logger = Logger(ApiServer::class)
         private const val CONTENT_TEXT = "text/plain"
-
-        private const val STATUS_OK = 200
-        private const val STATUS_BAD_REQUEST = 400
-        private const val STATUS_NOT_FOUND = 404
-        private const val STATUS_INTERNAL_SERVER_ERROR = 500
     }
 }
index a65a268..c0f8b34 100644 (file)
@@ -24,11 +24,10 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.ApiServer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
@@ -52,7 +51,7 @@ fun main(args: Array<String>) =
 
 
 private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
-    return ApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
+    return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
             .start(config.apiPort, config.kafkaTopics)
-            .void()
+            .unit()
 }
index 0bdd115..2932367 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 import arrow.core.Either
 import arrow.core.Left
 import arrow.core.None
+import arrow.core.Right
 import arrow.core.Some
 import arrow.effects.IO
 import javax.json.stream.JsonParsingException
@@ -67,7 +68,7 @@ internal class MessageStreamValidationTest : Spek({
     }
 
     fun givenParsedMessageParameters(vararg params: MessageParameters) {
-        whenever(messageParametersParser.parse(any())).thenReturn(params.toList())
+        whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
     }
 
     describe("validate") {
index 79fc932..1adf0ca 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.healthcheck.api
 
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-enum class HealthStatus(val httpResponseStatus: Int) {
-    UP(OK),
-    DOWN(SERVICE_UNAVAILABLE),
-    OUT_OF_SERVICE(SERVICE_UNAVAILABLE),
-    UNKNOWN(SERVICE_UNAVAILABLE)
+enum class HealthStatus(val httpResponseStatus: HttpStatus) {
+    UP(HttpStatus.OK),
+    DOWN(HttpStatus.SERVICE_UNAVAILABLE),
+    OUT_OF_SERVICE(HttpStatus.SERVICE_UNAVAILABLE),
+    UNKNOWN(HttpStatus.SERVICE_UNAVAILABLE)
 }
index 7e9efac..753f73e 100644 (file)
@@ -51,7 +51,7 @@ class HealthCheckApiServer(private val healthState: HealthState, private val por
 
     private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
             healthDescription.get().run {
-                resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message))
+                resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message))
             }
 
     private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
index 5127e7e..a0235e1 100644 (file)
@@ -27,9 +27,9 @@
     </appender>
 
   <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
-  <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
-  <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
-  <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
+  <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
+  <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
+  <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
   <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
 
   <root level="INFO">
index 3960e39..3b6c0e8 100644 (file)
             <version>${project.parent.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt
new file mode 100644 (file)
index 0000000..5491374
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Either
+import arrow.core.identity
+import org.assertj.core.api.AbstractAssert
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.ObjectAssert
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+class EitherAssert<A, B>(actual: Either<A, B>)
+    : AbstractAssert<EitherAssert<A, B>, Either<A, B>>(actual, EitherAssert::class.java) {
+
+    fun isLeft(): EitherAssert<A, B> {
+        isNotNull()
+        isInstanceOf(Either.Left::class.java)
+        return myself
+    }
+
+    fun left(): ObjectAssert<A> {
+        isLeft()
+        val left =  actual.fold(
+                ::identity,
+                { throw AssertionError("should be left") })
+        return assertThat(left)
+    }
+
+    fun isRight(): EitherAssert<A, B> {
+        isNotNull()
+        isInstanceOf(Either.Right::class.java)
+        return myself
+    }
+
+    fun right(): ObjectAssert<B> {
+        isRight()
+        val right =  actual.fold(
+                { throw AssertionError("should be right") },
+                ::identity)
+        return assertThat(right)
+    }
+}
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.utils.http
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Either
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.time.Duration
 
 /**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since August 2018
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
  */
-class Status {
-    companion object {
-        const val OK = 200
-        const val SERVICE_UNAVAILABLE = 503
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils")
+
+object Assertions : org.assertj.core.api.Assertions() {
+    fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
+}
+
+
+fun waitUntilSucceeds(action: () -> Unit) = waitUntilSucceeds(50, Duration.ofMillis(10), action)
+
+fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) {
+    var tryNum = 0
+    while (tryNum <= retries) {
+        tryNum++
+        try {
+            logger.debug("Try number $tryNum")
+            action()
+            break
+        } catch (ex: Throwable) {
+            if (tryNum >= retries)
+                throw ex
+            else
+                Thread.sleep(sleepTime.toMillis())
+        }
     }
 }
index f1b7f06..81daf9b 100644 (file)
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-syntax</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.jetbrains.kotlinx</groupId>
+            <artifactId>kotlinx-coroutines-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.ratpack</groupId>
+            <artifactId>ratpack-core</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>javax.json</groupId>
+            <artifactId>javax.json-api</artifactId>
+            <optional>true</optional>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <artifactId>logback-classic</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
-
-
 </project>
\ No newline at end of file
index 2d538b7..a99fef5 100644 (file)
@@ -35,6 +35,10 @@ import java.util.concurrent.atomic.AtomicReference
 
 fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
 
+fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
+
+fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
+
 fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
 
 fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
index cef537e..05d1309 100644 (file)
@@ -50,10 +50,14 @@ fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitC
         flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
 
 
-fun IO<Any>.void() = map { Unit }
+fun IO<Any>.unit() = map { Unit }
 
-fun <T> Mono<T>.asIo() = IO.async<T> { proc ->
-    subscribe({ proc(Right(it)) }, { proc(Left(it)) })
+fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
+    subscribe({
+        callback(Right(it))
+    }, {
+        callback(Left(it))
+    })
 }
 
 fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt
new file mode 100644 (file)
index 0000000..c5c4639
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.typeclasses.Show
+import java.util.*
+import javax.json.Json
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+object HttpConstants {
+    const val STATUS_OK = 200
+    const val STATUS_ACCEPTED = 202
+    const val STATUS_BAD_REQUEST = 400
+    const val STATUS_NOT_FOUND = 404
+    const val STATUS_INTERNAL_SERVER_ERROR = 500
+    const val STATUS_SERVICE_UNAVAILABLE = 503
+
+    const val CONTENT_TYPE_JSON = "application/json"
+    const val CONTENT_TYPE_TEXT = "text/plain"
+}
+
+enum class HttpStatus(val number: Int) {
+    OK(HttpConstants.STATUS_OK),
+    ACCEPTED(HttpConstants.STATUS_ACCEPTED),
+    BAD_REQUEST(HttpConstants.STATUS_BAD_REQUEST),
+    NOT_FOUND(HttpConstants.STATUS_NOT_FOUND),
+    INTERNAL_SERVER_ERROR(HttpConstants.STATUS_INTERNAL_SERVER_ERROR),
+    SERVICE_UNAVAILABLE(HttpConstants.STATUS_SERVICE_UNAVAILABLE)
+}
+
+
+enum class ContentType(val value: String) {
+    JSON(HttpConstants.CONTENT_TYPE_JSON),
+    TEXT(HttpConstants.CONTENT_TYPE_TEXT)
+}
+
+data class Response(val status: HttpStatus, val content: Content<Any>)
+data class Content<T>(val type: ContentType, val value: T, val serializer: Show<T> = Show.any())
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+object Responses {
+
+    fun acceptedResponse(id: UUID): Response {
+        return Response(
+                HttpStatus.ACCEPTED,
+                Content(ContentType.TEXT, id)
+        )
+    }
+
+    fun statusResponse(name: String, message: String, httpStatus: HttpStatus = HttpStatus.OK): Response {
+        return Response(httpStatus,
+                Content(ContentType.JSON,
+                        Json.createObjectBuilder()
+                                .add("status", name)
+                                .add("message", message)
+                                .build()))
+    }
+}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
new file mode 100644 (file)
index 0000000..0282d0c
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import javax.json.Json
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack")
+
+fun ratpack.http.Response.sendOrError(action: IO<Unit>) {
+    sendAndHandleErrors(action.map {
+        Response(
+                HttpStatus.OK,
+                Content(
+                        ContentType.JSON,
+                        Json.createObjectBuilder().add("response", "Request accepted").build()))
+    })
+}
+
+fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) {
+    when(response) {
+        is Either.Left -> send(errorResponse(response.a.toString()))
+        is Either.Right -> sendAndHandleErrors(IO.just(response.b))
+    }
+}
+
+fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) {
+    response.attempt().unsafeRunSync().fold(
+            { err ->
+                logger.warn("Error occurred. Sending .", err)
+                val message = err.message
+                send(errorResponse(message))
+            },
+            ::send
+    )
+}
+
+private fun errorResponse(message: String?): Response {
+    return Response(
+            HttpStatus.INTERNAL_SERVER_ERROR,
+            Content(
+                    ContentType.JSON,
+                    Json.createObjectBuilder().add("error", message).build()))
+}
+
+fun ratpack.http.Response.send(response: Response) {
+    val respWithStatus = status(response.status.number)
+    response.content.apply {
+        respWithStatus.send(
+                type.value,
+                serializer.run { value.show() })
+    }
+}
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt
new file mode 100644 (file)
index 0000000..f9f716a
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import java.util.*
+import javax.json.JsonObject
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class ResponsesTest : Spek({
+    describe("response factory") {
+        describe("accepted response") {
+            given("uuid") {
+                val uuid = UUID.randomUUID()
+
+                on("calling acceptedResponse") {
+                    val result = Responses.acceptedResponse(uuid)
+
+                    it ("should have ACCEPTED status") {
+                        assertThat(result.status).isEqualTo(HttpStatus.ACCEPTED)
+                    }
+
+                    it ("should have text body") {
+                        assertThat(result.content.type).isEqualTo(ContentType.TEXT)
+                    }
+
+                    it ("should contain UUID text in the body") {
+                        val serialized = result.content.serializer.run { result.content.value.show() }
+                        assertThat(serialized).isEqualTo(uuid.toString())
+                    }
+                }
+            }
+        }
+        describe("status response") {
+            given("all params are specified") {
+                val status = "ok"
+                val message = "good job"
+                val httpStatus = HttpStatus.OK
+
+                on("calling statusResponse") {
+                    val result = Responses.statusResponse(status, message, httpStatus)
+                    val json = result.content.value as JsonObject
+
+                    it ("should have OK status") {
+                        assertThat(result.status).isEqualTo(HttpStatus.OK)
+                    }
+
+                    it ("should have json body") {
+                        assertThat(result.content.type).isEqualTo(ContentType.JSON)
+                    }
+
+                    it ("should contain status as string") {
+                        assertThat(json.getString("status")).isEqualTo(status)
+                    }
+
+                    it ("should contain message") {
+                        assertThat(json.getString("message")).isEqualTo(message)
+                    }
+                }
+            }
+
+            given("default params are omitted") {
+                val status = "ok"
+                val message = "good job"
+
+                on("calling statusResponse") {
+                    val result = Responses.statusResponse(status, message)
+
+                    it ("should have OK status") {
+                        assertThat(result.status).isEqualTo(HttpStatus.OK)
+                    }
+                }
+            }
+        }
+    }
+})
index 060f28a..754fa31 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.api
 
+import arrow.core.Either
+import arrow.core.Option
 import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
 import javax.json.JsonArray
 
 interface MessageParametersParser {
-    fun parse(request: JsonArray): List<MessageParameters>
+    fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>>
 
     companion object {
         val INSTANCE: MessageParametersParser by lazy {
@@ -31,3 +33,5 @@ interface MessageParametersParser {
         }
     }
 }
+
+data class ParsingError(val message: String, val cause: Option<Throwable>)
index 5b328f1..f309561 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 
+import arrow.core.Option
+import arrow.core.Try
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
 import javax.json.JsonArray
 
 /**
@@ -32,8 +35,8 @@ internal class MessageParametersParserImpl(
         private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser()
 ) : MessageParametersParser {
 
-    override fun parse(request: JsonArray): List<MessageParameters> =
-            try {
+    override fun parse(request: JsonArray) =
+            Try {
                 request
                         .map { it.asJsonObject() }
                         .map {
@@ -41,13 +44,13 @@ internal class MessageParametersParserImpl(
                                     .parse(it.getJsonObject("commonEventHeader"))
                             val messageType = MessageType.valueOf(it.getString("messageType"))
                             val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue()
-                                    ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.",
-                                            NullPointerException())
+                                    ?: throw NullPointerException("\"messagesAmount\" could not be parsed from message.")
                             MessageParameters(commonEventHeader, messageType, messagesAmount)
                         }
-            } catch (e: Exception) {
-                throw ParsingException("Parsing request body failed", e)
+            }.toEither().mapLeft { ex ->
+                ParsingError(
+                        ex.message ?: "Unable to parse message parameters",
+                        Option.fromNullable(ex))
             }
 
-    internal class ParsingException(message: String, cause: Exception) : Exception(message, cause)
 }
index 9256199..3b1a48b 100644 (file)
 package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
 
 import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.assertj.core.api.Assertions.fail
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl.ParsingException
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
 import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
 
@@ -45,18 +44,20 @@ object MessageParametersParserTest : Spek({
                 it("should parse MessagesParameters object successfully") {
                     val result = messageParametersParser.parse(validMessagesParametesJson())
 
-                    assertThat(result).isNotNull
-                    assertThat(result).hasSize(2)
-                    val firstMessage = result.first()
-                    assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
-                    assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+                    result.fold({ fail("should have succeeded") }) { rightResult ->
+                        assertThat(rightResult).hasSize(2)
+                        val firstMessage = rightResult.first()
+                        assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
+                        assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+
+                    }
                 }
             }
+
             on("invalid parameters json") {
                 it("should throw exception") {
-                    assertThatExceptionOfType(ParsingException::class.java).isThrownBy {
-                        messageParametersParser.parse(invalidMessagesParametesJson())
-                    }
+                    val result = messageParametersParser.parse(invalidMessagesParametesJson())
+                    assertThat(result.isLeft()).describedAs("is left").isTrue()
                 }
             }
         }
index d44e251..cfe1dc1 100644 (file)
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-effects</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.jetbrains.kotlinx</groupId>
+            <artifactId>kotlinx-coroutines-core</artifactId>
+        </dependency>
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
deleted file mode 100644 (file)
index 02e6ee7..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser.Companion.INSTANCE
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import reactor.core.scheduler.Schedulers
-import javax.json.Json
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-internal class HttpServer(private val vesClient: XnfSimulator,
-                          private val messageParametersParser: MessageParametersParser = INSTANCE) {
-
-    fun start(port: Int): IO<RatpackServer> = IO {
-        RatpackServer.start { server ->
-            server.serverConfig(ServerConfig.embedded().port(port))
-                    .handlers(this::configureHandlers)
-        }
-    }
-
-    private fun configureHandlers(chain: Chain) {
-        chain
-                .post("simulator/sync") { ctx ->
-                    ctx.request.body
-                            .map { Json.createReader(it.inputStream).readArray() }
-                            .map { messageParametersParser.parse(it) }
-                            .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
-                            .map { vesClient.sendIo(it) }
-                            .map { it.unsafeRunSync() }
-                            .onError { handleException(it, ctx) }
-                            .then { sendAcceptedResponse(ctx) }
-                }
-                .post("simulator/async") { ctx ->
-                    ctx.request.body
-                            .map { Json.createReader(it.inputStream).readArray() }
-                            .map { messageParametersParser.parse(it) }
-                            .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
-                            .map { vesClient.sendRx(it) }
-                            .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
-                            .onError { handleException(it, ctx) }
-                            .then { sendAcceptedResponse(ctx) }
-                }
-                .get("healthcheck") { ctx ->
-                    ctx.response.status(STATUS_OK).send()
-                }
-    }
-
-    private fun sendAcceptedResponse(ctx: Context) {
-        ctx.response
-                .status(STATUS_OK)
-                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
-                        .add("response", "Request accepted")
-                        .build()
-                        .toString())
-    }
-
-    private fun handleException(t: Throwable, ctx: Context) {
-        logger.warn("Failed to process the request - ${t.localizedMessage}")
-        logger.debug("Exception thrown when processing the request", t)
-        ctx.response
-                .status(STATUS_BAD_REQUEST)
-                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
-                        .add("response", "Request was not accepted")
-                        .add("exception", t.localizedMessage)
-                        .build()
-                        .toString())
-    }
-
-    companion object {
-        private val logger = Logger(HttpServer::class)
-        const val STATUS_OK = 200
-        const val STATUS_BAD_REQUEST = 400
-        const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
-    }
-}
index e8a474d..558bd1c 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
+import arrow.core.Either
+import arrow.core.Some
+import arrow.core.Try
+import arrow.core.fix
+import arrow.core.flatMap
+import arrow.core.monad
 import arrow.effects.IO
-import io.netty.handler.ssl.ClientAuth
-import io.netty.handler.ssl.SslContext
-import io.netty.handler.ssl.SslContextBuilder
-import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
-
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import java.io.InputStream
+import javax.json.Json
 
 /**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
  */
-internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
-
-    private val client: TcpClient = TcpClient.builder()
-            .options { opts ->
-                opts.host(configuration.vesHost)
-                        .port(configuration.vesPort)
-                        .sslContext(createSslContext(configuration.security))
-            }
-            .build()
-
-    fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
-        sendRx(messages).block()
-    }
-
-    fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
-        val complete = ReplayProcessor.create<Void>(1)
-        client
-                .newHandler { _, output -> handler(complete, messages, output) }
-                .doOnError {
-                    logger.info("Failed to connect to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}")
-                }
-                .subscribe {
-                    logger.info("Connected to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}")
-                }
-        return complete.then()
-    }
-
-    private fun handler(complete: ReplayProcessor<Void>,
-                        messages: Flux<PayloadWireFrameMessage>,
-                        nettyOutbound: NettyOutbound): Publisher<Void> {
-
-        val allocator = nettyOutbound.alloc()
-        val encoder = WireFrameEncoder(allocator)
-        val frames = messages
-                .map(encoder::encode)
-                .window(MAX_BATCH_SIZE)
-
-        return nettyOutbound
-                .logConnectionClosed()
-                .options { it.flushOnBoundary() }
-                .sendGroups(frames)
-                .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
-                .then {
-                    logger.info("Messages have been sent")
-                    complete.onComplete()
-                }
-                .then()
-    }
-
-    private fun createSslContext(config: SecurityConfiguration): SslContext =
-            SslContextBuilder.forClient()
-                    .keyManager(config.cert.toFile(), config.privateKey.toFile())
-                    .trustManager(config.trustedCert.toFile())
-                    .sslProvider(SslProvider.OPENSSL)
-                    .clientAuth(ClientAuth.REQUIRE)
-                    .build()
-
-    private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
-        context().onClose {
-            logger.info { "Connection to ${context().address()} has been closed" }
-        }
-        return this
-    }
-
-    companion object {
-        private val logger = Logger(XnfSimulator::class)
-        private const val MAX_BATCH_SIZE = 128
-        private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
-    }
+class XnfSimulator(
+        private val vesClient: VesHvClient,
+        private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
+        private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+
+    fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+            Either.monad<ParsingError>().binding {
+                val json = parseJsonArray(messageParameters).bind()
+                val parsed = messageParametersParser.parse(json).bind()
+                val generatedMessages = messageGenerator.createMessageFlux(parsed)
+                vesClient.sendIo(generatedMessages)
+            }.fix()
+
+    private fun parseJsonArray(jsonStream: InputStream) =
+            Try {
+                Json.createReader(jsonStream).readArray()
+            }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) }
 }
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
new file mode 100644 (file)
index 0000000..22e47d7
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import io.netty.handler.ssl.ClientAuth
+import io.netty.handler.ssl.SslContext
+import io.netty.handler.ssl.SslContextBuilder
+import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpClient
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesHvClient(private val configuration: SimulatorConfiguration) {
+
+    private val client: TcpClient = TcpClient.builder()
+            .options { opts ->
+                opts.host(configuration.vesHost)
+                        .port(configuration.vesPort)
+                        .sslContext(createSslContext(configuration.security))
+            }
+            .build()
+
+    fun sendIo(messages: Flux<PayloadWireFrameMessage>) =
+            sendRx(messages).then(Mono.just(Unit)).asIo()
+
+    private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
+        val complete = ReplayProcessor.create<Void>(1)
+        client
+                .newHandler { _, output -> handler(complete, messages, output) }
+                .doOnError {
+                    logger.info("Failed to connect to VesHvCollector on " +
+                            "${configuration.vesHost}:${configuration.vesPort}")
+                }
+                .subscribe {
+                    logger.info("Connected to VesHvCollector on " +
+                            "${configuration.vesHost}:${configuration.vesPort}")
+                }
+        return complete.then()
+    }
+
+    private fun handler(complete: ReplayProcessor<Void>,
+                        messages: Flux<PayloadWireFrameMessage>,
+                        nettyOutbound: NettyOutbound): Publisher<Void> {
+
+        val allocator = nettyOutbound.alloc()
+        val encoder = WireFrameEncoder(allocator)
+        val frames = messages
+                .map(encoder::encode)
+                .window(MAX_BATCH_SIZE)
+
+        return nettyOutbound
+                .logConnectionClosed()
+                .options { it.flushOnBoundary() }
+                .sendGroups(frames)
+                .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
+                .then {
+                    logger.info("Messages have been sent")
+                    complete.onComplete()
+                }
+                .then()
+    }
+
+    private fun createSslContext(config: SecurityConfiguration): SslContext =
+            SslContextBuilder.forClient()
+                    .keyManager(config.cert.toFile(), config.privateKey.toFile())
+                    .trustManager(config.trustedCert.toFile())
+                    .sslProvider(SslProvider.OPENSSL)
+                    .clientAuth(ClientAuth.REQUIRE)
+                    .build()
+
+    private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+        context().onClose {
+            logger.info { "Connection to ${context().address()} has been closed" }
+        }
+        return this
+    }
+
+    companion object {
+        private val logger = Logger(VesHvClient::class)
+        private const val MAX_BATCH_SIZE = 128
+        private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
+    }
+}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
new file mode 100644 (file)
index 0000000..54ead6f
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.utils.http.Content
+import org.onap.dcae.collectors.veshv.utils.http.ContentType
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.http.TypedData
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import java.util.*
+import javax.json.Json
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal class XnfApiServer(
+        private val xnfSimulator: XnfSimulator,
+        private val ongoingSimulations: OngoingSimulations) {
+
+    fun start(port: Int): IO<RatpackServer> = IO {
+        RatpackServer.start { server ->
+            server.serverConfig(ServerConfig.embedded().port(port))
+                    .handlers(this::configureHandlers)
+        }
+    }
+
+    private fun configureHandlers(chain: Chain) {
+        chain
+                .post("simulator", ::startSimulationHandler)
+                .post("simulator/async", ::startSimulationHandler)
+                .get("simulator/:id", ::simulatorStatusHandler)
+                .get("healthcheck") { ctx ->
+                    logger.info("Checking health")
+                    ctx.response.status(HttpConstants.STATUS_OK).send()
+                }
+    }
+
+    private fun startSimulationHandler(ctx: Context) {
+        logger.info("Starting asynchronous scenario")
+        ctx.request.body.then { body ->
+            val id = startSimulation(body)
+            ctx.response.sendEitherErrorOrResponse(id)
+        }
+    }
+
+    private fun startSimulation(body: TypedData): Either<ParsingError, Response> {
+        return xnfSimulator.startSimulation(body.inputStream)
+                .map(ongoingSimulations::startAsynchronousSimulation)
+                .map(Responses::acceptedResponse)
+    }
+
+    private fun simulatorStatusHandler(ctx: Context) {
+        val id = UUID.fromString(ctx.pathTokens["id"])
+        val status = ongoingSimulations.status(id)
+        val response = Responses.statusResponse(status.toString(), status.message)
+        ctx.response.sendAndHandleErrors(IO.just(response))
+    }
+
+    companion object {
+        private val logger = Logger(XnfApiServer::class)
+    }
+}
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.simulators.xnf.config
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
 
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
 
@@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-internal data class SimulatorConfiguration(
+data class SimulatorConfiguration(
         val listenPort: Int,
         val vesHost: String,
         val vesPort: Int,
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
new file mode 100644 (file)
index 0000000..95bb489
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import arrow.effects.IO
+import kotlinx.coroutines.experimental.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executor
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+    private val asyncSimulationContext = executor.asCoroutineDispatcher()
+    private val simulations = ConcurrentHashMap<UUID, Status>()
+
+    fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+        val id = UUID.randomUUID()
+        simulations[id] = StatusOngoing
+
+        simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
+            result.fold(
+                    { err ->
+                        logger.warn("Error", err)
+                        simulations[id] = StatusFailure(err)
+                    },
+                    {
+                        logger.info("Finished sending messages")
+                        simulations[id] = StatusSuccess
+                    }
+            )
+        }
+        return id
+    }
+
+    fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+    internal fun clear() {
+        simulations.clear()
+    }
+
+    companion object {
+        private val logger = Logger(XnfApiServer::class)
+    }
+}
+
+sealed class Status(val message: String) {
+    override fun toString() = this::class.simpleName ?: "null"
+}
+
+object StatusNotFound : Status("not found")
+object StatusOngoing : Status("ongoing")
+object StatusSuccess : Status("success")
+data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")
index fa6d626..c9e900a 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf
 
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
@@ -38,11 +40,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
  */
 fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-        .map {config ->
-            XnfSimulator(config)
-                    .let { HttpServer(it) }
+        .map { config ->
+            XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
                     .start(config.listenPort)
-                    .void()
+                    .unit()
         }
         .unsafeRunEitherSync(
                 { ex ->
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
new file mode 100644 (file)
index 0000000..70d8ba8
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.effects.IO
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
+import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class OngoingSimulationsTest : Spek({
+    val executor = Executors.newSingleThreadExecutor()
+    val cut = OngoingSimulations(executor)
+
+    describe("simulations repository") {
+        given("not existing task task id") {
+            val id = UUID.randomUUID()
+
+            on("status") {
+                val result = cut.status(id)
+
+                it("should have 'not found' status") {
+                    assertThat(result).isEqualTo(StatusNotFound)
+                }
+            }
+        }
+
+        given("never ending task") {
+            val task = IO.async<Unit> { }
+
+            on("startAsynchronousSimulation") {
+                val result = cut.startAsynchronousSimulation(task)
+
+                it("should have ongoing status") {
+                    assertThat(cut.status(result)).isEqualTo(StatusOngoing)
+                }
+            }
+        }
+
+        given("failing task") {
+            val cause = RuntimeException("facepalm")
+            val task = IO.raiseError<Unit>(cause)
+
+            on("startAsynchronousSimulation") {
+                val result = cut.startAsynchronousSimulation(task)
+
+                it("should have failing status") {
+                    waitUntilSucceeds {
+                        assertThat(cut.status(result)).isEqualTo(StatusFailure(cause))
+                    }
+                }
+            }
+        }
+
+        given("successful task") {
+            val task = IO { println("great success!") }
+
+            on("startAsynchronousSimulation") {
+                val result = cut.startAsynchronousSimulation(task)
+
+                it("should have successful status") {
+                    waitUntilSucceeds {
+                        assertThat(cut.status(result)).isEqualTo(StatusSuccess)
+                    }
+                }
+            }
+        }
+
+        afterGroup {
+            executor.shutdown()
+        }
+    }
+
+    afterEachTest { cut.clear() }
+})
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
new file mode 100644 (file)
index 0000000..80f3957
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Right
+import arrow.effects.IO
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import reactor.core.publisher.Flux
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class XnfSimulatorTest : Spek({
+    lateinit var cut: XnfSimulator
+    lateinit var vesClient: VesHvClient
+    lateinit var messageParametersParser: MessageParametersParser
+    lateinit var messageGenerator: MessageGenerator
+
+    beforeEachTest {
+        vesClient = mock()
+        messageParametersParser = mock()
+        messageGenerator = mock()
+        cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+    }
+
+    describe("startSimulation") {
+        it("should fail when empty input stream") {
+            // given
+            val emptyInputStream = ByteInputStream()
+
+            // when
+            val result = cut.startSimulation(emptyInputStream)
+
+            // then
+            assertThat(result).isLeft()
+        }
+
+        it("should fail when invalid JSON") {
+            // given
+            val invalidJson = "invalid json".byteInputStream()
+
+            // when
+            val result = cut.startSimulation(invalidJson)
+
+            // then
+            assertThat(result).isLeft()
+        }
+
+        it("should fail when JSON syntax is valid but content is invalid") {
+            // given
+            val json = "[1,2,3]".byteInputStream()
+            val cause = ParsingError("epic fail", None)
+            whenever(messageParametersParser.parse(any())).thenReturn(
+                    Left(cause))
+
+            // when
+            val result = cut.startSimulation(json)
+
+            // then
+            assertThat(result).left().isEqualTo(cause)
+        }
+
+        it("should return generated messages") {
+            // given
+            val json = "[true]".byteInputStream()
+            val messageParams = listOf<MessageParameters>()
+            val generatedMessages = Flux.empty<PayloadWireFrameMessage>()
+            val sendingIo = IO {}
+            whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+            whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
+            whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
+
+            // when
+            val result = cut.startSimulation(json)
+
+            // then
+            assertThat(result).right().isSameAs(sendingIo)
+        }
+    }
+})
index 8749dc5..69caf72 100644 (file)
@@ -26,9 +26,9 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration.DefaultValues
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration.DefaultValues
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
 import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
 import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
 import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
diff --git a/pom.xml b/pom.xml
index 03dac1f..e1c90b3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,7 @@
         <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
         <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
         <jacoco.version>0.8.2</jacoco.version>
-        <jacoco.minimum.coverage>60</jacoco.minimum.coverage>
+        <jacoco.minimum.coverage>66</jacoco.minimum.coverage>
 
         <!-- Protocol buffers -->
         <protobuf.version>3.5.1</protobuf.version>
                                                 <https_proxy>${docker.http_proxy}</https_proxy>
                                             </args>
                                             -->
+                                            
                                             <dockerFileDir>${project.basedir}</dockerFileDir>
                                             <tags>
                                                 <tag>${project.version}-${maven.build.timestamp}Z</tag>
                 <artifactId>kotlin-reflect</artifactId>
                 <version>${kotlin.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.jetbrains.kotlinx</groupId>
+                <artifactId>kotlinx-coroutines-core</artifactId>
+                <version>0.25.0</version>
+            </dependency>
             <dependency>
                 <groupId>io.arrow-kt</groupId>
                 <artifactId>arrow-core</artifactId>