Implemented rest server in client simulator 87/58587/2
authorJakub Dudycz <jakub.dudycz@nokia.com>
Mon, 11 Jun 2018 14:54:47 +0000 (16:54 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 06:51:34 +0000 (08:51 +0200)
Change-Id: I212b79fe2a0272f340c5ca889beff60b469f7f24
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-client-simulator/pom.xml
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt

index 86cdeca..c1c1f2e 100644 (file)
             <artifactId>logback-classic</artifactId>
             <scope>runtime</scope>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.ratpack</groupId>
+            <artifactId>ratpack-core</artifactId>
+        </dependency>
     </dependencies>
 
 
index 6f53c91..b8a4b88 100644 (file)
@@ -35,7 +35,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-
 internal object DefaultValues {
     const val MESSAGES_AMOUNT = -1L
     const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt
new file mode 100644 (file)
index 0000000..f993f45
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.config
+
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+data class MessageParameters(val commonEventHeader: CommonEventHeader, val amount: Long = -1)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
new file mode 100644 (file)
index 0000000..bc1cff7
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import javax.json.Json
+import javax.json.JsonObject
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class HttpServer(private val vesClient: VesHvClient) {
+
+    fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
+        RatpackServer.of {
+            it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
+        }
+    }.doOnNext { it.start() }
+
+
+    private fun configureHandlers(chain: Chain) {
+        chain.post("simulator") { ctx ->
+            ctx.request.body
+                    .map { Json.createReader(it.inputStream).readObject() }
+                    .map { extractMessageParameters(it) }
+                    .map { MessageFactory.createMessageFlux(it) }
+                    .onError { handleException(it, ctx) }
+                    .then {
+                        vesClient.send(it)
+                        ctx.response
+                                .status(STATUS_OK)
+                                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+                                        .add("response", "Request accepted")
+                                        .build()
+                                        .toString())
+                    }
+        }
+    }
+
+    private fun handleException(t: Throwable, ctx: Context) {
+        logger.warn("Failed to process the request - ${t.localizedMessage}")
+        logger.debug("Exception thrown when processing the request", t)
+        ctx.response
+                .status(STATUS_BAD_REQUEST)
+                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+                        .add("response", "Request was not accepted")
+                        .add("exception", t.localizedMessage)
+                        .build()
+                        .toString())
+    }
+
+    private fun extractMessageParameters(request: JsonObject): MessageParameters =
+            try {
+                val commonEventHeader = MessageFactory
+                        .parseCommonHeader(request.getJsonObject("commonEventHeader"))
+                val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
+                MessageParameters(commonEventHeader, messagesAmount)
+            } catch (e: Exception) {
+                throw ValidationException("Validating request body failed", e)
+            }
+
+
+    companion object {
+        private val logger = Logger(HttpServer::class)
+        const val DEFAULT_PORT = 5000
+        const val STATUS_OK = 200
+        const val STATUS_BAD_REQUEST = 400
+        const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
+    }
+}
+
+internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause)
index 87a238a..6011760 100644 (file)
@@ -21,50 +21,53 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import com.google.protobuf.ByteString
 import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import javax.json.JsonObject
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-class MessageFactory {
+object MessageFactory {
 
-    companion object {
-        const val DEFAULT_START_EPOCH: Long = 120034455
-        const val DEFAULT_LAST_EPOCH: Long = 120034455
-    }
 
-    fun createMessageFlux(amount: Long = -1): Flux<WireFrame> =
-            Mono.fromCallable(this::createMessage).let {
-                if (amount < 0)
+    fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+            Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
+                if (messageParameters.amount < 0)
                     it.repeat()
                 else
-                    it.repeat(amount)
+                    it.repeat(messageParameters.amount)
             }
 
+    fun parseCommonHeader(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder()
+            .setVersion(json.getString("version"))
+            .setDomain(CommonEventHeader.Domain.forNumber(json.getInt("domain")))
+            .setSequence(json.getInt("sequence"))
+            .setPriority(CommonEventHeader.Priority.forNumber(json.getInt("priority")))
+            .setEventId(json.getString("eventId"))
+            .setEventName(json.getString("eventName"))
+            .setEventType(json.getString("eventType"))
+            .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
+            .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
+            .setNfNamingCode(json.getString("nfNamingCode"))
+            .setNfcNamingCode(json.getString("nfcNamingCode"))
+            .setReportingEntityId(json.getString("reportingEntityId"))
+            .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
+            .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
+            .setSourceName(json.getString("sourceName"))
+            .build()
 
-    private fun createMessage(): WireFrame {
-        val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
-                .setVersion("1.9")
-                .setEventName("Sample event name")
-                .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
-                .setEventId("Sample event Id")
-                .setSourceName("Sample Source")
-                .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
-                .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM)
-                .setStartEpochMicrosec(DEFAULT_START_EPOCH)
-                .setLastEpochMicrosec(DEFAULT_LAST_EPOCH)
-                .setSequence(2)
-                .build()
 
-        val payload = vesMessageBytes(commonHeader)
-        return WireFrame(payload)
-    }
+    private fun createMessage(commonHeader: CommonEventHeader): WireFrame =
+            WireFrame(vesMessageBytes(commonHeader))
+
 
-    private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray {
-        val msg = VesEventV5.VesEvent.newBuilder()
+    private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray {
+        val msg = VesEvent.newBuilder()
                 .setCommonEventHeader(commonHeader)
                 .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
                 .build()
index 13256c5..78a72d9 100644 (file)
@@ -32,17 +32,15 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import reactor.ipc.netty.NettyInbound
 import reactor.ipc.netty.NettyOutbound
 import reactor.ipc.netty.tcp.TcpClient
-import java.util.function.BiFunction
 
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-class VesHvClient(configuration: ClientConfiguration) {
+class VesHvClient(private val configuration: ClientConfiguration) {
 
     private val client: TcpClient = TcpClient.builder()
             .options { opts ->
@@ -52,18 +50,17 @@ class VesHvClient(configuration: ClientConfiguration) {
             }
             .build()
 
-    fun send(messages: Flux<WireFrame>) {
-        client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
-    }
+    fun send(messages: Flux<WireFrame>) =
+            client
+                    .newHandler { _, out -> handler(out, messages) }
+                    .doOnError{logger.info("Failed to connect to VesHvCollector on " +
+                            "${configuration.vesHost}:${configuration.vesPort}")}
+                    .subscribe { logger.info("Connected to VesHvCollector on " +
+                            "${configuration.vesHost}:${configuration.vesPort}") }
+
 
-    private fun handler(nettyInbound: NettyInbound,
-                        nettyOutbound: NettyOutbound,
-                        messages: Flux<WireFrame>): Publisher<Void> {
+    private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> {
 
-        nettyInbound
-                .receive()
-                .asString(Charsets.UTF_8)
-                .subscribe { str -> logger.info("Server response: $str") }
 
         val encoder = WireFrameEncoder(nettyOutbound.alloc())
 
@@ -74,6 +71,7 @@ class VesHvClient(configuration: ClientConfiguration) {
         return nettyOutbound
                 .options { it.flushOnEach() }
                 .send(frames)
+                .then { logger.info("Messages have been sent") }
     }
 
     private fun createSslContext(config: SecurityConfiguration): SslContext =
index 3fa023b..08bc6a7 100644 (file)
@@ -20,7 +20,7 @@
 package org.onap.dcae.collectors.veshv.simulators.xnf
 
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient
 import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
 import org.slf4j.LoggerFactory.getLogger
@@ -35,9 +35,12 @@ private val logger = getLogger("Simulator :: main")
 fun main(args: Array<String>) {
     try {
         val clientConfig = ArgBasedClientConfiguration().parse(args)
-        val messageFactory = MessageFactory()
-        val client = VesHvClient(clientConfig)
-            client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+        val vesClient = VesHvClient(clientConfig)
+
+        HttpServer(vesClient)
+                .start()
+                .block()
+
     } catch (e: WrongArgumentException) {
         e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
         System.exit(1)
index 405a15e..ee1d1cf 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
+import com.google.protobuf.ByteString
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM
 import reactor.test.test
 
+const val SAMPLE_START_EPOCH: Long = 120034455
+const val SAMPLE_LAST_EPOCH: Long = 120034455
+
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
 object MessageFactoryTest : Spek({
     describe("message factory") {
-        val factory = MessageFactory()
 
-        given("no parameters") {
+        val factory = MessageFactory
+
+        given("only common header") {
             it("should return infinite flux") {
                 val limit = 1000L
-                factory.createMessageFlux().take(limit).test()
+                factory.createMessageFlux(getSampleMessageParameters()).take(limit).test()
                         .expectNextCount(limit)
                         .verifyComplete()
             }
         }
-        given("messages amount") {
+        given("common header and messages amount") {
             it("should return message flux of specified size") {
-                factory.createMessageFlux(5).test()
+                factory.createMessageFlux((getSampleMessageParameters(5))).test()
                         .expectNextCount(5)
                         .verifyComplete()
             }
         }
     }
 })
+
+fun getSampleMessageParameters(amount: Long = -1): MessageParameters{
+    val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+            .setVersion("sample-version")
+            .setDomain(HVRANMEAS)
+            .setSequence(1)
+            .setPriority(MEDIUM)
+            .setEventId("sample-event-id")
+            .setEventName("sample-event-name")
+            .setEventType("sample-event-type")
+            .setStartEpochMicrosec(SAMPLE_START_EPOCH)
+            .setLastEpochMicrosec(SAMPLE_LAST_EPOCH)
+            .setNfNamingCode("sample-nf-naming-code")
+            .setNfcNamingCode("sample-nfc-naming-code")
+            .setReportingEntityId("sample-reporting-entity-id")
+            .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+            .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
+            .setSourceName("sample-source-name")
+            .build()
+
+    return MessageParameters(commonHeader, amount)
+}
index 19bfa12..f614d42 100644 (file)
@@ -44,8 +44,8 @@ class Logger(val logger: org.slf4j.Logger) {
         logger.debug(message)
     }
 
-    fun debug(message: String, ex: Exception) {
-        logger.debug(message, ex)
+    fun debug(message: String, t: Throwable) {
+        logger.debug(message, t)
     }
 
     fun debug(messageProvider: () -> String) {
@@ -54,12 +54,13 @@ class Logger(val logger: org.slf4j.Logger) {
         }
     }
 
-    fun debug(ex: Exception, messageProvider: () -> String) {
+    fun debug(t: Throwable, messageProvider: () -> String) {
         if (logger.isDebugEnabled) {
-            logger.debug(messageProvider(), ex)
+            logger.debug(messageProvider(), t)
         }
     }
 
+
     //
     // INFO
     //
@@ -67,9 +68,6 @@ class Logger(val logger: org.slf4j.Logger) {
         logger.info(message)
     }
 
-    fun info(message: String, ex: Exception) {
-        logger.info(message, ex)
-    }
 
     fun info(messageProvider: () -> String) {
         if (logger.isInfoEnabled) {
@@ -77,9 +75,14 @@ class Logger(val logger: org.slf4j.Logger) {
         }
     }
 
-    fun info(ex: Exception, messageProvider: () -> String) {
+    fun info(message: String, t: Throwable) {
+        logger.info(message, t)
+    }
+
+
+    fun info(t: Throwable, messageProvider: () -> String) {
         if (logger.isInfoEnabled) {
-            logger.info(messageProvider(), ex)
+            logger.info(messageProvider(), t)
         }
     }
 
@@ -92,8 +95,8 @@ class Logger(val logger: org.slf4j.Logger) {
         logger.warn(message)
     }
 
-    fun warn(message: String, ex: Exception) {
-        logger.warn(message, ex)
+    fun warn(message: String, t: Throwable) {
+        logger.warn(message, t)
     }
 
     fun warn(messageProvider: () -> String) {
@@ -102,9 +105,9 @@ class Logger(val logger: org.slf4j.Logger) {
         }
     }
 
-    fun warn(ex: Exception, messageProvider: () -> String) {
+    fun warn(t: Throwable, messageProvider: () -> String) {
         if (logger.isWarnEnabled) {
-            logger.warn(messageProvider(), ex)
+            logger.warn(messageProvider(), t)
         }
     }
 
@@ -117,8 +120,9 @@ class Logger(val logger: org.slf4j.Logger) {
         logger.error(message)
     }
 
-    fun error(message: String, ex: Exception) {
-        logger.error(message, ex)
+
+    fun error(message: String, t: Throwable) {
+        logger.error(message, t)
     }
 
     fun error(messageProvider: () -> String) {
@@ -127,9 +131,9 @@ class Logger(val logger: org.slf4j.Logger) {
         }
     }
 
-    fun error(ex: Exception, messageProvider: () -> String) {
+    fun error(t: Throwable, messageProvider: () -> String) {
         if (logger.isErrorEnabled) {
-            logger.error(messageProvider(), ex)
+            logger.error(messageProvider(), t)
         }
     }
 }