Implemented simple tcp client 59/58359/1
authorJakub Dudycz <jakub.dudycz@nokia.com>
Tue, 29 May 2018 12:46:27 +0000 (14:46 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 1 Aug 2018 07:18:45 +0000 (09:18 +0200)
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Change-Id: Iaf913186b93eb7eebfb6f44c19d489a64ed60c2b
Issue-ID: DCAEGEN2-601

hv-collector-client-simulator/pom.xml
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt
hv-collector-client-simulator/src/main/resources/logback.xml [new file with mode: 0644]
hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt [deleted file]
hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt

index 11d287c..caa1099 100644 (file)
     </build>
 
     <dependencies>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>protobuf</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-utils</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-core</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
-            <scope>test</scope>
+            <scope>runtime</scope>
         </dependency>
     </dependencies>
 
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt
new file mode 100644 (file)
index 0000000..7c28eda
--- /dev/null
@@ -0,0 +1,83 @@
+package org.onap.dcae.collectors.veshv.main.config
+
+import org.apache.commons.cli.Option
+import org.apache.commons.cli.Options
+import org.apache.commons.cli.DefaultParser
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.HelpFormatter
+
+
+internal object DefaultValues {
+    const val MESSAGES_AMOUNT = 1
+}
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal object ArgBasedClientConfiguration {
+
+    private val OPT_VES_PORT = Option.builder("p")
+            .longOpt("port")
+            .required()
+            .hasArg()
+            .desc("VesHvCollector port")
+            .build()
+
+    private val OPT_VES_HOST = Option.builder("h")
+            .longOpt("host")
+            .required()
+            .hasArg()
+            .desc("VesHvCollector host")
+            .build()
+
+    private val OPT_MESSAGES_AMOUNT = Option.builder("m")
+            .longOpt("messages")
+            .hasArg()
+            .desc("Amount of messages to send")
+            .build()
+
+    private val options by lazy {
+        val options = Options()
+        options.addOption(OPT_VES_PORT)
+        options.addOption(OPT_VES_HOST)
+        options.addOption(OPT_MESSAGES_AMOUNT)
+        options
+    }
+
+    fun parse(args: Array<out String>): ClientConfiguration {
+        val parser = DefaultParser()
+
+        try {
+            parser.parse(options, args).run {
+                return ClientConfiguration(
+                        stringValue(OPT_VES_HOST),
+                        intValue(OPT_VES_PORT),
+                        intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT))
+            }
+        } catch (ex: Exception) {
+            throw WrongArgumentException(ex)
+        }
+    }
+
+    private fun CommandLine.intValueOrDefault(option: Option, default: Int) =
+            getOptionValue(option.opt)?.toInt() ?: default
+
+    private fun CommandLine.intValue(option: Option) =
+            getOptionValue(option.opt).toInt()
+
+    private fun CommandLine.stringValue(option: Option) =
+            getOptionValue(option.opt)
+
+
+    class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) {
+        fun printMessage() {
+            println(message)
+        }
+
+        fun printHelp(programName: String) {
+            val formatter = HelpFormatter()
+            formatter.printHelp(programName, options)
+        }
+    }
+}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt
new file mode 100644 (file)
index 0000000..742c286
--- /dev/null
@@ -0,0 +1,7 @@
+package org.onap.dcae.collectors.veshv.main.config
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+data class ClientConfiguration( val vesHost: String, val vesPort: Int ,val messagesAmount: Int)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt
new file mode 100644 (file)
index 0000000..e0c53ae
--- /dev/null
@@ -0,0 +1,59 @@
+package org.onap.dcae.collectors.veshv.main.impl
+
+import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.ves.VesEventV5
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class MessageFactory {
+
+    companion object {
+        const val DEFAULT_START_EPOCH: Long = 120034455
+        const val DEFAULT_LAST_EPOCH: Long = 120034455
+    }
+
+    fun createMessageFlux(amount: Int = 1): Flux<WireFrame> =
+            Mono.just(createMessage()).repeat(amount.toLong())
+
+
+    private fun createMessage(): WireFrame {
+        val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+                .setVersion("1.9")
+                .setEventName("Sample event name")
+                .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
+                .setEventId("Sample event Id")
+                .setSourceName("Sample Source")
+                .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
+                .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM)
+                .setStartEpochMicrosec(DEFAULT_START_EPOCH)
+                .setLastEpochMicrosec(DEFAULT_LAST_EPOCH)
+                .setSequence(2)
+                .build()
+
+        val payload = vesMessageBytes(commonHeader)
+        return WireFrame(
+                payload = payload,
+                mark = 0xFF,
+                majorVersion = 1,
+                minorVersion = 2,
+                payloadSize = payload.readableBytes())
+
+
+    }
+
+    private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
+        val msg = VesEventV5.VesEvent.newBuilder()
+                .setCommonEventHeader(commonHeader)
+                .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
+                .build()
+
+        return Unpooled.wrappedBuffer(msg.toByteArray())
+    }
+}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt
new file mode 100644 (file)
index 0000000..d23a6f7
--- /dev/null
@@ -0,0 +1,42 @@
+package org.onap.dcae.collectors.veshv.main.impl
+
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.main.config.ClientConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
+import reactor.ipc.netty.NettyInbound
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpClient
+import java.util.function.BiFunction
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesHvClient(configuration: ClientConfiguration) {
+
+    private val logger = Logger(VesHvClient::class)
+    private val client: TcpClient = TcpClient.create(configuration.vesHost, configuration.vesPort)
+
+    fun send(messages: Flux<WireFrame>) {
+        client.start(BiFunction { i, o -> handler(i, o, messages) })
+    }
+
+    // sending flux with multiple WireFrames not supported yet
+    private fun handler(nettyInbound: NettyInbound,
+                        nettyOutbound: NettyOutbound,
+                        messages: Flux<WireFrame>): Publisher<Void> {
+
+        nettyInbound
+                .receive()
+                .asString(Charsets.UTF_8)
+                .subscribe { str -> logger.info("Server response: $str") }
+
+        return nettyOutbound
+                .options { it.flushOnEach() }
+                .send(messages.map { it.encode(ByteBufAllocator.DEFAULT) })
+    }
+}
index 35710c0..137ffdf 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main
 
+import org.onap.dcae.collectors.veshv.main.config.ArgBasedClientConfiguration
+import org.onap.dcae.collectors.veshv.main.impl.MessageFactory
+import org.onap.dcae.collectors.veshv.main.impl.VesHvClient
 import org.slf4j.LoggerFactory.getLogger
 
 
 private val logger = getLogger("Simulator :: main")
 
-fun main(args : Array<String>){
-    logger.info("Hello world")
-}
\ No newline at end of file
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+fun main(args: Array<String>) = try {
+
+    val clientConfig = ArgBasedClientConfiguration.parse(args)
+    val messageFactory = MessageFactory()
+    val client = VesHvClient(clientConfig)
+    client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+} catch (e: Exception) {
+    logger.error(e.localizedMessage)
+    logger.debug("An error occurred when starting ves client", e)
+}
+
diff --git a/hv-collector-client-simulator/src/main/resources/logback.xml b/hv-collector-client-simulator/src/main/resources/logback.xml
new file mode 100644 (file)
index 0000000..809f62d
--- /dev/null
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+  <property name="LOG_FILE"
+    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>
+        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+      </pattern>
+    </encoder>
+  </appender>
+
+    <appender name="ROLLING-FILE"
+      class="ch.qos.logback.core.rolling.RollingFileAppender">
+      <encoder>
+        <pattern>${FILE_LOG_PATTERN}</pattern>
+      </encoder>
+      <file>${LOG_FILE}</file>
+      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+        <maxFileSize>50MB</maxFileSize>
+        <maxHistory>30</maxHistory>
+        <totalSizeCap>10GB</totalSizeCap>
+      </rollingPolicy>
+    </appender>
+
+  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+  <root level="INFO">
+      <appender-ref ref="CONSOLE"/>
+      <appender-ref ref="ROLLING-FILE"/>
+    </root>
+</configuration>
\ No newline at end of file
diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt
deleted file mode 100644 (file)
index 770adeb..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.main
-
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import kotlin.test.assertEquals
-
-object DummyTest : Spek({
-    on("sum of 2 and 3") {
-        val sum = 2 + 3
-        it("outcome should be equals 5"){
-            assertEquals(5, sum)
-        }
-    }
-})
diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt
new file mode 100644 (file)
index 0000000..5a89da4
--- /dev/null
@@ -0,0 +1,31 @@
+package org.onap.dcae.collectors.veshv.main
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.main.impl.MessageFactory
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+object WireFrameTest : Spek({
+
+    val factory = MessageFactory()
+
+
+    given("no parameters") {
+        it("should return flux with one message") {
+            val result = factory.createMessageFlux()
+
+            assertEquals(1, result.count().block())
+        }
+    }
+    given("messages amount") {
+        it("should return message flux of specified size") {
+            val result = factory.createMessageFlux(5)
+            assertEquals(5, result.count().block())
+        }
+    }
+})
\ No newline at end of file
index ea78706..cf484d7 100644 (file)
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.domain
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class ServerConfiguration(val port: Int, val configurationUrl: String)
+data class ServerConfiguration( val configurationUrl: String, val port: Int)
index 6311b6c..4e614cd 100644 (file)
@@ -54,8 +54,8 @@ internal object ArgBasedServerConfiguration {
         try {
             parser.parse(options, args).run {
                 return ServerConfiguration(
-                        intValue(OPT_PORT, DefaultValues.PORT),
-                        stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL))
+                        stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL),
+                        intValue(OPT_PORT, DefaultValues.PORT))
             }
         } catch (ex: Exception) {
             throw WrongArgumentException(ex)