Use netty HttpServer in DcaeAppApiServer 21/81921/6
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>
Fri, 8 Mar 2019 08:14:56 +0000 (09:14 +0100)
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>
Tue, 12 Mar 2019 11:22:39 +0000 (12:22 +0100)
Change-Id: I895ad192babd9cc40266d0bec3830fcd4b0e054b
Issue-ID: DCAEGEN2-1325
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
sources/hv-collector-dcae-app-simulator/pom.xml
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt [new file with mode: 0644]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt

index b82c001..d4ab056 100644 (file)
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-syntax</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.ratpack</groupId>
-            <artifactId>ratpack-core</artifactId>
-        </dependency>
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
index 88e01c2..d2c5b27 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,15 +21,13 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
-import org.onap.dcae.collectors.veshv.utils.http.Responses
-import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
-import org.onap.dcae.collectors.veshv.utils.http.sendOrError
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.http.*
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRoutes
 import java.net.InetSocketAddress
 
 /**
@@ -53,67 +51,73 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
     }
 
 
-    fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> =
+    fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
             simulator.listenToTopics(kafkaTopics).map {
-                RatpackServer.start { server ->
-                    server.serverConfig(
-                            ServerConfig.embedded()
-                                    .port(socketAddress.port)
-                    ).handlers(::setupHandlers)
-                }
+                HttpServer.create()
+                        .host(socketAddress.hostName)
+                        .port(socketAddress.port)
+                        .route(::setRoutes)
+                        .let { NettyServerHandle(it.bindNow()) }
             }
 
-    private fun setupHandlers(chain: Chain) {
-        chain
-                .put("configuration/topics") { ctx ->
-                    ctx.request.body.then { body ->
-                        val operation = simulator.listenToTopics(body.text)
-                        ctx.response.sendOrError(operation)
-                    }
 
+    private fun setRoutes(route: HttpServerRoutes) {
+        route
+                .put("/configuration/topics") { req, res ->
+                    req
+                            .receive().aggregate().asString()
+                            .flatMap {
+                                val option = simulator.listenToTopics(it)
+                                res.sendOrError(option).then()
+                            }
                 }
-                .delete("messages") { ctx ->
-                    ctx.response.contentType(CONTENT_TEXT)
+                .delete("/messages") { _, res ->
                     logger.info { "Resetting simulator state" }
-                    ctx.response.sendOrError(simulator.resetState())
+                    res
+                            .header("Content-type", CONTENT_TEXT)
+                            .sendOrError(simulator.resetState())
                 }
-                .get("messages/all/count") { ctx ->
+                .get("/messages/all/count") { _, res ->
                     logger.info { "Processing request for count of received messages" }
                     simulator.state().fold(
                             {
-                                ctx.response.status(HttpConstants.STATUS_NOT_FOUND)
                                 logger.warn { "Error - number of messages could not be specified" }
+                                res.status(HttpConstants.STATUS_NOT_FOUND)
                             },
                             {
                                 logger.info { "Returned number of received messages: ${it.messagesCount}" }
-                                ctx.response
-                                        .contentType(CONTENT_TEXT)
-                                        .send(it.messagesCount.toString())
-                            })
+                                res.sendString(Mono.just(it.messagesCount.toString()))
+                            }
+                    )
                 }
-                .post("messages/all/validate") { ctx ->
-                    ctx.request.body.then { body ->
-                        logger.info { "Processing request for message validation" }
-                        val response = simulator.validate(body.inputStream)
-                                .map { isValid ->
-                                    if (isValid) {
-                                        logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
-                                        responseValid
-                                    } else {
-                                        logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
-                                        responseInvalid
-                                    }
-                                }
-                        ctx.response.sendAndHandleErrors(response)
-                    }
+                .post("/messages/all/validate") { req, res ->
+                    req
+                            .receive().aggregate().asInputStream()
+                            .flatMap { body ->
+                                logger.info { "Processing request for message validation" }
+                                val response =
+                                        simulator.validate(body)
+                                                .map(::resolveValidationResponse)
+                                res.sendAndHandleErrors(response).then()
+                            }
                 }
-                .get("healthcheck") { ctx ->
+                .get("/healthcheck") { _, res ->
                     val status = HttpConstants.STATUS_OK
                     logger.info { "Healthcheck OK, returning status: $status" }
-                    ctx.response.status(status).send()
+                    res.status(status).send()
                 }
     }
 
+    private fun resolveValidationResponse(isValid: Boolean): Response =
+            if (isValid) {
+                logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
+                responseValid
+            } else {
+                logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
+                responseInvalid
+            }
+
+
     companion object {
         private const val CONTENT_TEXT = "text/plain"
         private const val VALID_RESPONSE_MESSAGE = "validation passed"
index 08a94e9..4ad9271 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,7 +28,6 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppAp
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unit
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -58,5 +57,5 @@ private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
     val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
     return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
             .start(config.apiAddress, config.kafkaTopics)
-            .unit()
+            .flatMap { it.await() }
 }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
new file mode 100644 (file)
index 0000000..33e65e4
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.netty.NettyOutbound
+import reactor.netty.http.server.HttpServerResponse
+import javax.json.Json
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.utils.http.netty")
+
+fun HttpServerResponse.sendOrError(action: IO<Unit>): NettyOutbound =
+        sendAndHandleErrors(action.map {
+            Response(
+                    HttpStatus.OK,
+                    Content(
+                            ContentType.JSON,
+                            Json.createObjectBuilder().add("response", "Request accepted").build()
+                    )
+            )
+        })
+
+
+fun HttpServerResponse.sendAndHandleErrors(response: IO<Response>): NettyOutbound =
+        response.attempt().unsafeRunSync().fold(
+                { err ->
+                    logger.withWarn { log("Error occurred. Sending .", err) }
+                    val message = err.message
+                    sendResponse(errorResponse(message))
+                },
+                {
+                    sendResponse(it)
+                }
+        )
+
+private fun HttpServerResponse.sendResponse(response: Response): NettyOutbound {
+    val respWithStatus = status(response.status.number)
+    val responseContent = response.content
+
+    return respWithStatus.sendString(Mono.just(responseContent.serializer.run { responseContent.value.show() }))
+}
+
+private fun errorResponse(message: String?): Response =
+        Response(
+                HttpStatus.INTERNAL_SERVER_ERROR,
+                Content(
+                        ContentType.JSON,
+                        Json.createObjectBuilder().add("error", message).build()
+                )
+        )
index a25b291..529804a 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -31,15 +31,6 @@ import javax.json.Json
 
 private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack")
 
-fun ratpack.http.Response.sendOrError(action: IO<Unit>) {
-    sendAndHandleErrors(action.map {
-        Response(
-                HttpStatus.OK,
-                Content(
-                        ContentType.JSON,
-                        Json.createObjectBuilder().add("response", "Request accepted").build()))
-    })
-}
 
 fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) {
     when (response) {