Use Netty HttpServer in XnfApiServer 95/82395/5
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>
Fri, 15 Mar 2019 08:43:56 +0000 (09:43 +0100)
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>
Fri, 15 Mar 2019 10:06:12 +0000 (11:06 +0100)
Change-Id: I86e06bd540c961098ee11af99735a5b35ce760fd
Issue-ID: DCAEGEN2-1325
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
pom.xml
sources/hv-collector-utils/pom.xml
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt [deleted file]
sources/hv-collector-xnf-simulator/pom.xml
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt

diff --git a/pom.xml b/pom.xml
index 9322608..8d5a2e6 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                 <artifactId>javax.json</artifactId>
                 <version>1.1.2</version>
             </dependency>
-            <dependency>
-                <!-- To override ratpack transitive dependency which creates security vulnerability
-                This issue will be resolved with ratpack 1.6.0 release -->
-                <groupId>com.fasterxml.jackson.core</groupId>
-                <artifactId>jackson-databind</artifactId>
-                <version>2.9.6</version>
-            </dependency>
-            <dependency>
-                <groupId>io.ratpack</groupId>
-                <artifactId>ratpack-core</artifactId>
-                <version>1.5.4</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>com.google.guava</groupId>
-                        <artifactId>guava</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
             <dependency>
                 <groupId>io.micrometer</groupId>
                 <artifactId>micrometer-registry-prometheus</artifactId>
index feba812..e85b8ee 100644 (file)
@@ -3,7 +3,7 @@
   ~ ============LICENSE_START=======================================================
   ~ dcaegen2-collectors-veshv
   ~ ================================================================================
-  ~ Copyright (C) 2018 NOKIA
+  ~ Copyright (C) 2018-2019 NOKIA
   ~ ================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
             <groupId>org.jetbrains.kotlinx</groupId>
             <artifactId>kotlinx-coroutines-core</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.ratpack</groupId>
-            <artifactId>ratpack-core</artifactId>
-            <optional>true</optional>
-        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
index 33e65e4..cf338a7 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.utils.http
 
+import arrow.core.Either
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Mono
@@ -52,6 +53,12 @@ fun HttpServerResponse.sendAndHandleErrors(response: IO<Response>): NettyOutboun
                 }
         )
 
+fun <A> HttpServerResponse.sendEitherErrorOrResponse(response: Either<A, Response>): NettyOutbound =
+        when (response) {
+            is Either.Left -> sendResponse(errorResponse(response.a.toString()))
+            is Either.Right -> sendAndHandleErrors(IO.just(response.b))
+        }
+
 private fun HttpServerResponse.sendResponse(response: Response): NettyOutbound {
     val respWithStatus = status(response.status.number)
     val responseContent = response.content
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
deleted file mode 100644 (file)
index 529804a..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.utils.http
-
-import arrow.core.Either
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import javax.json.Json
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-
-private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack")
-
-
-fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) {
-    when (response) {
-        is Either.Left -> send(errorResponse(response.a.toString()))
-        is Either.Right -> sendAndHandleErrors(IO.just(response.b))
-    }
-}
-
-fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) {
-    response.attempt().unsafeRunSync().fold(
-            { err ->
-                logger.withWarn { log("Error occurred. Sending .", err) }
-                val message = err.message
-                send(errorResponse(message))
-            },
-            ::send
-    )
-}
-
-private fun errorResponse(message: String?): Response {
-    return Response(
-            HttpStatus.INTERNAL_SERVER_ERROR,
-            Content(
-                    ContentType.JSON,
-                    Json.createObjectBuilder().add("error", message).build()))
-}
-
-fun ratpack.http.Response.send(response: Response) {
-    val respWithStatus = status(response.status.number)
-    response.content.apply {
-        respWithStatus.send(
-                type.value,
-                serializer.run { value.show() })
-    }
-}
index 91e965f..c17d29f 100644 (file)
             <groupId>org.glassfish</groupId>
             <artifactId>javax.json</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.ratpack</groupId>
-            <artifactId>ratpack-core</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
index 654f16a..7df7283 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,17 +23,20 @@ import arrow.core.Either
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.http.Response
 import org.onap.dcae.collectors.veshv.utils.http.Responses
 import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
 import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.http.TypedData
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
+import reactor.netty.http.server.HttpServerResponse
+import reactor.netty.http.server.HttpServerRoutes
+import java.io.InputStream
 import java.net.InetSocketAddress
 import java.util.*
 
@@ -45,48 +48,49 @@ internal class XnfApiServer(
         private val xnfSimulator: XnfSimulator,
         private val ongoingSimulations: OngoingSimulations) {
 
-    fun start(socketAddress: InetSocketAddress): IO<RatpackServer> = IO {
-        RatpackServer.start { server ->
-            server.serverConfig(ServerConfig.embedded()
-                    .port(socketAddress.port))
-                    .handlers(this::configureHandlers)
-        }
+    fun start(socketAddress: InetSocketAddress): IO<ServerHandle> = IO {
+        HttpServer.create()
+                .host(socketAddress.hostName)
+                .port(socketAddress.port)
+                .route(::setRoutes)
+                .let { NettyServerHandle(it.bindNow()) }
     }
 
-    private fun configureHandlers(chain: Chain) {
-        chain
-                .post("simulator", ::startSimulationHandler)
-                .post("simulator/async", ::startSimulationHandler)
-                .get("simulator/:id", ::simulatorStatusHandler)
+    private fun setRoutes(route: HttpServerRoutes) {
+        route
+                .post("/simulator", ::startSimulationHandler)
+                .post("/simulator/async", ::startSimulationHandler)
+                .get("/simulator/:id", ::simulatorStatusHandler)
     }
 
-    private fun startSimulationHandler(ctx: Context) {
+    private fun startSimulationHandler(req: HttpServerRequest, res: HttpServerResponse): Mono<Void> {
         logger.info { "Attempting to start asynchronous scenario" }
-        ctx.request.body.then { body ->
-            val id = startSimulation(body)
-            when (id) {
-                is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" }
-                is Either.Right -> logger.info { "Scenario started, details: ${id.b}" }
-            }
-            ctx.response.sendEitherErrorOrResponse(id)
-        }
+        return req.receive().aggregate().asInputStream()
+                .flatMap { body ->
+                    val id = startSimulation(body)
+                    when (id) {
+                        is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" }
+                        is Either.Right -> logger.info { "Scenario started, details: ${id.b}" }
+                    }
+                    res.sendEitherErrorOrResponse(id).then()
+                }
     }
 
-    private fun startSimulation(body: TypedData): Either<ParsingError, Response> {
-        return xnfSimulator.startSimulation(body.inputStream)
-                .map(ongoingSimulations::startAsynchronousSimulation)
-                .map(Responses::acceptedResponse)
-    }
+
+    private fun startSimulation(body: InputStream): Either<ParsingError, Response> =
+            xnfSimulator.startSimulation(body)
+                    .map(ongoingSimulations::startAsynchronousSimulation)
+                    .map(Responses::acceptedResponse)
 
 
-    private fun simulatorStatusHandler(ctx: Context) {
+    private fun simulatorStatusHandler(req: HttpServerRequest, res: HttpServerResponse): Mono<Void> {
         logger.debug { "Checking task status" }
-        val id = UUID.fromString(ctx.pathTokens["id"])
+        val id = UUID.fromString(req.param("id"))
         logger.debug { "Checking status for id: $id" }
         val status = ongoingSimulations.status(id)
         val response = Responses.statusResponse(status.toString(), status.message)
         logger.info { "Task $id status: $response" }
-        ctx.response.sendAndHandleErrors(IO.just(response))
+        return res.sendAndHandleErrors(IO.just(response)).then()
     }
 
     companion object {
index 60214de..a73b39b 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -39,7 +39,6 @@ import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import ratpack.server.RatpackServer
 
 private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
 private val logger = Logger(PACKAGE_NAME)
@@ -58,21 +57,27 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
                     ExitFailure(1)
                 },
                 {
-                    logger.info { "Started xNF Simulator API server" }
-                    HealthState.INSTANCE.changeState(HealthDescription.IDLE)
+                    logger.info { "Stopping xNF Simulator API server" }
                 }
         )
 
-private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
+private fun startServers(config: SimulatorConfiguration): IO<Unit> =
         IO.monad().binding {
             logger.info { "Using configuration: $config" }
+
             XnfHealthCheckServer().startServer(config).bind()
+
             val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
             val xnfSimulator = XnfSimulator(
                     ClientFactory(clientConfig),
                     MessageGeneratorFactory(config.maxPayloadSizeBytes)
             )
-            XnfApiServer(xnfSimulator, OngoingSimulations())
-                    .start(config.listenAddress)
-                    .bind()
+            val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
+                    .start(config.listenAddress).bind()
+
+            logger.info { "Started xNF Simulator API server" }
+            HealthState.INSTANCE.changeState(HealthDescription.IDLE)
+
+            xnfApiServerHandler.await().bind()
         }.fix()
+