Fix TCP stream framing issue 09/58409/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 7 Jun 2018 09:52:16 +0000 (11:52 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 1 Aug 2018 11:06:43 +0000 (13:06 +0200)
Because of the nature of TCP protocol we receive consecutive IO buffer
snapshots - not separate messages. That means that we need to join
incomming buffers and then split it into separate WireFrames.

Closes ONAP-312
Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

39 files changed:
docker-compose.yml
hv-collector-client-simulator/Dockerfile
hv-collector-client-simulator/pom.xml
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt [new file with mode: 0644]
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt [deleted file]
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt [new file with mode: 0644]
hv-collector-core/src/test/resources/logback-test.xml [moved from hv-collector-core/src/test/resources/logback.xml with 93% similarity]
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
hv-collector-ct/src/test/resources/logback-test.xml
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt [new file with mode: 0644]
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt [moved from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt with 62% similarity]
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt [new file with mode: 0644]
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt [new file with mode: 0644]
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
hv-collector-main/Dockerfile
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/main/resources/logback.xml
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt

index 68bb3d0..af8e0e0 100644 (file)
@@ -25,7 +25,7 @@ services:
     depends_on:
       - kafka
     volumes:
-      - /etc/ves-hv/:/etc/ves-hv/
+      - ./ssl/:/etc/ves-hv/
   xnf-simulator:
     build:
       context: hv-collector-client-simulator
@@ -33,4 +33,4 @@ services:
     depends_on:
       - hv-collector
     volumes:
-      - /etc/ves-hv/:/etc/ves-hv/
\ No newline at end of file
+      - ./ssl/:/etc/ves-hv/
\ No newline at end of file
index 159f900..19c4c87 100644 (file)
@@ -6,8 +6,8 @@ LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
 LABEL maintainer="Nokia Wroclaw ONAP Team"
 
 WORKDIR /opt/ves-hv-client-simulator
-ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"]
 CMD ["--ves-host", "hv-collector", "--ves-port", "6061"]
 COPY target/libs/external/* ./
 COPY target/libs/internal/* ./
-COPY target/hv-collector-client-simulator-*.jar ./
\ No newline at end of file
+COPY target/hv-collector-client-simulator-*.jar ./
index e7a2585..012bda5 100644 (file)
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-stdlib-jdk8</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+            <scope>runtime</scope>
+            <classifier>${os.detected.classifier}</classifier>
+        </dependency>
         <dependency>
             <groupId>com.nhaarman</groupId>
             <artifactId>mockito-kotlin</artifactId>
index 49653b5..b946689 100644 (file)
@@ -28,7 +28,7 @@ import java.io.File
 import java.nio.file.Paths
 
 internal object DefaultValues {
-    const val MESSAGES_AMOUNT = 1
+    const val MESSAGES_AMOUNT = -1L
     const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
     const val CERT_FILE = "/etc/ves-hv/client.crt"
     const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
@@ -98,7 +98,7 @@ internal object ArgBasedClientConfiguration {
             val cmdLine = parser.parse(options, args)
             val host = cmdLine.stringValue(OPT_VES_HOST)
             val port = cmdLine.intValue(OPT_VES_PORT)
-            val msgsAmount = cmdLine.intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
+            val msgsAmount = cmdLine.longValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
             return ClientConfiguration(
                     host,
                     port,
@@ -121,8 +121,9 @@ internal object ArgBasedClientConfiguration {
 
     private fun stringPathToPath(path: String) = Paths.get(File(path).toURI())
 
-    private fun CommandLine.intValueOrDefault(option: Option, default: Int) =
-            getOptionValue(option.opt)?.toInt() ?: default
+
+    private fun CommandLine.longValueOrDefault(option: Option, default: Long) =
+            getOptionValue(option.opt)?.toLong() ?: default
 
     private fun CommandLine.intValue(option: Option) =
             getOptionValue(option.opt).toInt()
index 0c578b3..d5f7c7c 100644 (file)
@@ -38,8 +38,13 @@ class MessageFactory {
         const val DEFAULT_LAST_EPOCH: Long = 120034455
     }
 
-    fun createMessageFlux(amount: Int = 1): Flux<WireFrame> =
-            Mono.just(createMessage()).repeat(amount.toLong())
+    fun createMessageFlux(amount: Long = -1): Flux<WireFrame> =
+            Mono.fromCallable(this::createMessage).let {
+                if (amount < 0)
+                    it.repeat()
+                else
+                    it.repeat(amount)
+            }
 
 
     private fun createMessage(): WireFrame {
@@ -57,14 +62,7 @@ class MessageFactory {
                 .build()
 
         val payload = vesMessageBytes(commonHeader)
-        return WireFrame(
-                payload = payload,
-                mark = 0xFF,
-                majorVersion = 1,
-                minorVersion = 2,
-                payloadSize = payload.readableBytes())
-
-
+        return WireFrame(payload)
     }
 
     private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
index c911c53..29573e8 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
+import io.netty.buffer.Unpooled
 import io.netty.handler.ssl.ClientAuth
 import io.netty.handler.ssl.SslContext
 import io.netty.handler.ssl.SslContextBuilder
@@ -29,6 +30,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfig
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import reactor.ipc.netty.NettyInbound
 import reactor.ipc.netty.NettyOutbound
 import reactor.ipc.netty.tcp.TcpClient
@@ -53,7 +55,6 @@ class VesHvClient(configuration: ClientConfiguration) {
         client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
     }
 
-    // sending flux with multiple WireFrames not supported yet
     private fun handler(nettyInbound: NettyInbound,
                         nettyOutbound: NettyOutbound,
                         messages: Flux<WireFrame>): Publisher<Void> {
@@ -64,8 +65,8 @@ class VesHvClient(configuration: ClientConfiguration) {
                 .subscribe { str -> logger.info("Server response: $str") }
 
         val frames = messages
-                .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } }
                 .map { it.encode(nettyOutbound.alloc()) }
+                .concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
 
         return nettyOutbound
                 .options { it.flushOnEach() }
index ee7f49a..68f999e 100644 (file)
@@ -36,12 +36,14 @@ fun main(args: Array<String>) {
         val clientConfig = ArgBasedClientConfiguration.parse(args)
         val messageFactory = MessageFactory()
         val client = VesHvClient(clientConfig)
-        client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+            client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
     } catch (e: ArgBasedClientConfiguration.WrongArgumentException) {
         e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
+        System.exit(1)
     } catch (e: Exception) {
         logger.error(e.localizedMessage)
         logger.debug("An error occurred when starting ves client", e)
+        System.exit(2)
     }
 }
 
index edcec65..405a15e 100644 (file)
@@ -23,8 +23,7 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
-import kotlin.test.assertEquals
+import reactor.test.test
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -35,16 +34,18 @@ object MessageFactoryTest : Spek({
         val factory = MessageFactory()
 
         given("no parameters") {
-            it("should return flux with one message") {
-                val result = factory.createMessageFlux()
-
-                assertEquals(1, result.count().block())
+            it("should return infinite flux") {
+                val limit = 1000L
+                factory.createMessageFlux().take(limit).test()
+                        .expectNextCount(limit)
+                        .verifyComplete()
             }
         }
         given("messages amount") {
             it("should return message flux of specified size") {
-                val result = factory.createMessageFlux(5)
-                assertEquals(5, result.count().block())
+                factory.createMessageFlux(5).test()
+                        .expectNextCount(5)
+                        .verifyComplete()
             }
         }
     }
index dfbbdb5..ed686fe 100644 (file)
 package org.onap.dcae.collectors.veshv.boundary
 
 import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
 interface Collector {
-    fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
+    fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
 }
 
 typealias CollectorProvider = () -> Collector
index 850d3a8..913d8f5 100644 (file)
@@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.impl.MessageValidator
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.WireDecoder
+import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
 import reactor.core.publisher.Flux
 import java.util.concurrent.atomic.AtomicReference
 
@@ -48,7 +48,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide
 
     private fun createVesHvCollector(config: CollectorConfiguration): Collector {
         return VesHvCollector(
-                WireDecoder(),
+                { WireDecoder(it) },
                 VesDecoder(),
                 MessageValidator(),
                 Router(config.routing),
index b0a9da8..12e1c1e 100644 (file)
@@ -39,7 +39,8 @@ internal class MessageValidator {
 
     fun isValid(message: VesMessage): Boolean {
         val header = message.header
-        return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+        val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+        return ret
     }
 
     private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
index cdc70f8..60e7d70 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import com.google.protobuf.InvalidProtocolBufferException
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -31,18 +30,8 @@ import org.onap.ves.VesEventV5.VesEvent
  */
 internal class VesDecoder {
 
-    fun decode(bb: ByteBuf): VesMessage? =
-            try {
-                val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
-                VesMessage(decodedHeader, bb)
-            } catch (ex: InvalidProtocolBufferException) {
-                logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" }
-                logger.debug("Cause", ex)
-                null
-            }
-
-
-    companion object {
-        private val logger = Logger(VesDecoder::class)
+    fun decode(bb: ByteBuf): VesMessage {
+        val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
+        return VesMessage(decodedHeader, bb)
     }
 }
index 535fbe1..ac11b3e 100644 (file)
 package org.onap.dcae.collectors.veshv.impl
 
 import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
 internal class VesHvCollector(
-        private val wireDecoder: WireDecoder,
+        private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder,
         private val protobufDecoder: VesDecoder,
         private val validator: MessageValidator,
         private val router: Router,
         private val sink: Sink) : Collector {
-    override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
-            dataStream
-                    .doOnNext(this::logIncomingMessage)
-                    .flatMap(this::decodeWire)
-                    .doOnNext(this::logDecodedWireMessage)
-                    .flatMap(this::decodeProtobuf)
-                    .filter(this::validate)
-                    .flatMap(this::findRoute)
-                    .compose(sink::send)
-                    .doOnNext(this::releaseMemory)
-                    .then()
 
-    private fun logIncomingMessage(wire: ByteBuf) {
-        logger.debug { "Got message with total ${wire.readableBytes()} B"}
-    }
-
-    private fun logDecodedWireMessage(payload: ByteBuf) {
-        logger.debug { "Wire payload size: ${payload.readableBytes()} B"}
-    }
-
-    private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode)
-
-    private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
+    override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
+            wireDecoderSupplier(alloc).let { wireDecoder ->
+                dataStream
+                        .concatMap(wireDecoder::decode)
+                        .filter(WireFrame::isValid)
+                        .map(WireFrame::payload)
+                        .map(protobufDecoder::decode)
+                        .filter(this::validate)
+                        .flatMap(this::findRoute)
+                        .compose(sink::send)
+                        .doOnNext(this::releaseMemory)
+                        .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+                        .then()
+            }
 
     private fun validate(msg: VesMessage): Boolean {
         val valid = validator.isValid(msg)
@@ -73,21 +69,16 @@ internal class VesHvCollector(
     private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
 
     private fun releaseMemory(msg: VesMessage) {
+        logger.trace { "Releasing memory from ${msg.rawMessage}" }
         msg.rawMessage.release()
     }
 
-    private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
-
-    private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
-        val result = mapper(input)
-        return if (result == null) {
-            input.release()
-            Mono.empty()
-        } else {
-            Mono.just(result)
-        }
+    private fun releaseBuffersMemory(wireDecoder: WireDecoder) {
+        wireDecoder.release()
     }
 
+    private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
+
     companion object {
         val logger = Logger(VesHvCollector::class)
     }
index 0aacb26..8e6db2a 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.common.serialization.ByteBufferSerializer
-import org.apache.kafka.common.serialization.StringSerializer
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import reactor.core.publisher.Flux
 import reactor.ipc.netty.http.client.HttpClient
-import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-import java.nio.ByteBuffer
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -51,33 +38,6 @@ object AdapterFactory {
                 override fun invoke() = Flux.just(config)
             }
 
-    private class KafkaSinkProvider : SinkProvider {
-        override fun invoke(config: CollectorConfiguration): Sink {
-            val sender = KafkaSender.create(
-                    SenderOptions.create<CommonEventHeader, ByteBuffer>()
-                            .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
-                            .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
-                            .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java))
-            return KafkaSink(sender)
-        }
-    }
-
-
-    private class LoggingSinkProvider : SinkProvider {
-        override fun invoke(config: CollectorConfiguration): Sink {
-            return object : Sink {
-                private val logger = Logger(LoggingSinkProvider::class)
-                override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
-                        messages
-                                .doOnNext { msg ->
-                                    logger.info { "Message routed to ${msg.topic}" }
-                                }
-                                .map { it.message }
-
-            }
-        }
-    }
-
     fun consulConfigurationProvider(url: String): ConfigurationProvider =
             ConsulConfigurationProvider(url, httpAdapter())
 
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt
new file mode 100644 (file)
index 0000000..82452e1
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteBufferSerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.ves.VesEventV5
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+import java.nio.ByteBuffer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class KafkaSinkProvider : SinkProvider {
+    override fun invoke(config: CollectorConfiguration): Sink {
+        return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+    }
+
+    private fun constructSenderOptions(config: CollectorConfiguration) =
+            SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>()
+                    .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
+                    .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
+                    .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)
+
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
new file mode 100644 (file)
index 0000000..62b6d1a
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class LoggingSinkProvider : SinkProvider {
+
+    override fun invoke(config: CollectorConfiguration): Sink {
+        return object : Sink {
+            private val logger = Logger(LoggingSinkProvider::class)
+            private val totalMessages = AtomicLong()
+            private val totalBytes = AtomicLong()
+
+            override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+                    messages
+                            .doOnNext(this::logMessage)
+                            .map { it.message }
+
+            private fun logMessage(msg: RoutedMessage) {
+                val msgs = totalMessages.addAndGet(1)
+                val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong())
+                val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
+                if (msgs % INFO_LOGGING_FREQ == 0L)
+                    logger.info(logMessageSupplier)
+                else
+                    logger.trace(logMessageSupplier)
+            }
+
+        }
+    }
+
+    companion object {
+        const val INFO_LOGGING_FREQ = 100_000
+    }
+}
index 208b1ba..564aa8d 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
+import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -59,17 +60,18 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
 
     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
         logger.debug("Got connection")
+        nettyOutbound.alloc()
 
         val sendHello = nettyOutbound
                 .options { it.flushOnEach() }
                 .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
                 .then()
 
-        val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive())
+        val handleIncomingMessages = collectorProvider()
+                .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain())
 
         return sendHello.then(handleIncomingMessages)
     }
-
     companion object {
         private val logger = Logger(NettyTcpServer::class)
     }
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
new file mode 100644 (file)
index 0000000..e4dd7cf
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.CompositeByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import reactor.core.publisher.FluxSink
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Consumer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class StreamBufferEmitter(
+        private val streamBuffer: CompositeByteBuf,
+        private val newFrame: ByteBuf)
+    : Consumer<FluxSink<WireFrame>> {
+
+    private val subscribed = AtomicBoolean(false)
+
+    override fun accept(sink: FluxSink<WireFrame>) {
+        when {
+
+            subscribed.getAndSet(true) ->
+                sink.error(IllegalStateException("Wire frame emitter supports only one subscriber"))
+
+            newFrame.readableBytes() == 0 -> {
+                logger.trace { "Discarding empty buffer" }
+                newFrame.release()
+                sink.complete()
+            }
+
+            else -> {
+                streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
+                sink.onDispose {
+                    logger.debug("Disposing read components")
+                    streamBuffer.discardReadComponents()
+                }
+                sink.onRequest { requestedFrameCount ->
+                    WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber()
+                }
+            }
+        }
+    }
+
+    companion object {
+        fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
+                Flux.create(StreamBufferEmitter(streamBuffer, newFrame))
+
+        private const val INCREASE_WRITER_INDEX = true
+        private val logger = Logger(StreamBufferEmitter::class)
+    }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt
new file mode 100644 (file)
index 0000000..b701aaf
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+    private val streamBuffer = alloc.compositeBuffer()
+
+    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf)
+            .doOnSubscribe { logIncomingMessage(byteBuf) }
+            .doOnNext(this::logDecodedWireMessage)
+
+    fun release() {
+        streamBuffer.release()
+    }
+
+
+    private fun logIncomingMessage(wire: ByteBuf) {
+        logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+    }
+
+    private fun logDecodedWireMessage(wire: WireFrame) {
+        logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+    }
+
+    companion object {
+        val logger = Logger(VesHvCollector::class)
+    }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
new file mode 100644 (file)
index 0000000..bc9c838
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.FluxSink
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireFrameSink(
+        private val streamBuffer: ByteBuf,
+        private val sink: FluxSink<WireFrame>,
+        private val requestedFrameCount: Long) {
+
+    fun handleSubscriber() {
+        logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+
+        try {
+            if (requestedFrameCount == Long.MAX_VALUE) {
+                logger.trace { "Push based strategy" }
+                pushAvailableFrames()
+            } else {
+                logger.trace { "Pull based strategy - req $requestedFrameCount" }
+                pushUpToNumberOfFrames()
+            }
+        } catch (ex: Exception) {
+            sink.error(ex)
+        }
+
+        logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+
+    }
+
+    private fun pushAvailableFrames() {
+        var nextFrame = decodeFirstFrameFromBuffer()
+        while (nextFrame != null && !sink.isCancelled) {
+            sink.next(nextFrame)
+            nextFrame = decodeFirstFrameFromBuffer()
+        }
+        sink.complete()
+    }
+
+    private fun pushUpToNumberOfFrames() {
+        var nextFrame = decodeFirstFrameFromBuffer()
+        var remaining = requestedFrameCount
+        loop@ while (nextFrame != null && !sink.isCancelled) {
+            sink.next(nextFrame)
+            if (--remaining > 0) {
+                nextFrame = decodeFirstFrameFromBuffer()
+            } else {
+                break@loop
+            }
+        }
+        if (remaining > 0 && nextFrame == null) {
+            sink.complete()
+        }
+    }
+
+    private fun decodeFirstFrameFromBuffer(): WireFrame? =
+            try {
+                WireFrame.decodeFirst(streamBuffer)
+            } catch (ex: MissingWireFrameBytesException) {
+                logger.debug { "${ex.message} - waiting for more data" }
+                null
+            }
+
+    companion object {
+        private val logger = Logger(WireFrameSink::class)
+    }
+}
index 8d9e496..263ad44 100644 (file)
 package org.onap.dcae.collectors.veshv.impl
 
 import com.google.protobuf.ByteString
+import com.google.protobuf.InvalidProtocolBufferException
 import io.netty.buffer.Unpooled.wrappedBuffer
 import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
@@ -60,8 +62,9 @@ internal object VesDecoderTest : Spek({
         on("invalid ves hv message bytes") {
             val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
 
-            it("should return empty result") {
-                assertThat(cut.decode(rawMessageBytes)).isNull()
+            it("should throw error") {
+                assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+                        .isThrownBy { cut.decode(rawMessageBytes) }
             }
         }
     }
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
deleted file mode 100644 (file)
index 81706ce..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.onap.dcae.collectors.veshv.impl
-
-import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
- * @since May 2018
- */
-internal object WireDecoderTest : Spek({
-    describe("decoding wire protocol") {
-        val cut = WireDecoder()
-
-        fun decode(frame: WireFrame) =
-                cut.decode(
-                        frame.encode(UnpooledByteBufAllocator.DEFAULT))
-
-        given("empty input") {
-            val input = Unpooled.EMPTY_BUFFER
-
-            it("should yield empty result") {
-                assertThat(cut.decode(input)).isNull()
-            }
-        }
-
-        given("input without 0xFF first byte") {
-            val input = WireFrame(
-                    payload = Unpooled.EMPTY_BUFFER,
-                    mark = 0x10,
-                    majorVersion = 1,
-                    minorVersion = 2,
-                    payloadSize = 0)
-
-            it("should yield empty result") {
-                assertThat(decode(input)).isNull()
-            }
-        }
-
-        given("input with unsupported major version") {
-            val input = WireFrame(
-                    payload = Unpooled.EMPTY_BUFFER,
-                    mark = 0xFF,
-                    majorVersion = 100,
-                    minorVersion = 2,
-                    payloadSize = 0)
-
-            it("should yield empty result") {
-                assertThat(decode(input)).isNull()
-            }
-        }
-
-        given("input with too small payload size") {
-            val input = WireFrame(
-                    payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
-                    mark = 0xFF,
-                    majorVersion = 1,
-                    minorVersion = 0,
-                    payloadSize = 1)
-
-            it("should yield empty result") {
-                assertThat(decode(input)).isNull()
-            }
-        }
-
-        given("input with too big payload size") {
-            val input = WireFrame(
-                    payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
-                    mark = 0xFF,
-                    majorVersion = 1,
-                    minorVersion = 0,
-                    payloadSize = 8)
-
-            it("should yield empty result") {
-                assertThat(decode(input)).isNull()
-            }
-        }
-
-        given("valid input") {
-            val payload = byteArrayOf(6, 9, 8, 6)
-            val input = WireFrame(
-                    payload = Unpooled.wrappedBuffer(payload),
-                    mark = 0xFF,
-                    majorVersion = 1,
-                    minorVersion = 0,
-                    payloadSize = payload.size)
-
-
-            it("should yield Google Protocol Buffers payload") {
-                val result = decode(input)!!
-
-                val actualPayload = ByteArray(result.readableBytes())
-                result.readBytes(actualPayload)
-
-                assertThat(actualPayload).containsExactly(*payload)
-            }
-        }
-    }
-})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt
new file mode 100644 (file)
index 0000000..0a10aa1
--- /dev/null
@@ -0,0 +1,233 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import reactor.test.test
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since May 2018
+ */
+internal object WireDecoderTest : Spek({
+    val alloc = UnpooledByteBufAllocator.DEFAULT
+    val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
+    val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
+
+    fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc))
+
+    fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
+        for (bb in byteBuffers) {
+            assertThat(bb.refCnt())
+                    .describedAs("should be released: $bb ref count")
+                    .isEqualTo(0)
+        }
+    }
+
+    fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
+        for (bb in byteBuffers) {
+            assertThat(bb.refCnt())
+                    .describedAs("should not be released: $bb ref count")
+                    .isEqualTo(1)
+        }
+    }
+
+    describe("decoding wire protocol") {
+        given("empty input") {
+            val input = Unpooled.EMPTY_BUFFER
+
+            it("should yield empty result") {
+                WireDecoder().decode(input).test().verifyComplete()
+            }
+        }
+
+        given("input with no readable bytes") {
+            val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
+
+            it("should yield empty result") {
+                WireDecoder().decode(input).test().verifyComplete()
+            }
+
+            it("should release memory") {
+                verifyMemoryReleased(input)
+            }
+        }
+
+        given("invalid input (not starting with marker)") {
+            val input = Unpooled.wrappedBuffer(samplePayload)
+
+            it("should yield error") {
+                WireDecoder().decode(input).test()
+                        .verifyError(InvalidWireFrameMarkerException::class.java)
+            }
+
+            it("should leave memory unreleased") {
+                verifyMemoryNotReleased(input)
+            }
+        }
+
+        given("valid input") {
+            val input = WireFrame(Unpooled.wrappedBuffer(samplePayload))
+
+            it("should yield decoded input frame") {
+                WireDecoder().decode(input).test()
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .verifyComplete()
+            }
+        }
+
+        given("valid input with part of next frame") {
+            val input = Unpooled.buffer()
+                    .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+                    .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3))
+
+            it("should yield decoded input frame") {
+                WireDecoder().decode(input).test()
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .verifyComplete()
+            }
+
+            it("should leave memory unreleased") {
+                verifyMemoryNotReleased(input)
+            }
+        }
+
+        given("valid input with garbage after it") {
+            val input = Unpooled.buffer()
+                    .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+                    .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+            it("should yield decoded input frame and error") {
+                WireDecoder().decode(input).test()
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .verifyError(InvalidWireFrameMarkerException::class.java)
+            }
+
+            it("should leave memory unreleased") {
+                verifyMemoryNotReleased(input)
+            }
+        }
+
+        given("two inputs containing two separate messages") {
+            val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+            val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+            it("should yield decoded input frames") {
+                val cut = WireDecoder()
+                cut.decode(input1).test()
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .verifyComplete()
+                cut.decode(input2).test()
+                        .expectNextMatches { it.payloadSize == anotherPayload.size }
+                        .verifyComplete()
+            }
+
+            it("should release memory") {
+                verifyMemoryReleased(input1, input2)
+            }
+        }
+
+        given("1st input containing 1st frame and 2nd input containing garbage") {
+            val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+            val input2 = Unpooled.wrappedBuffer(anotherPayload)
+
+            it("should yield decoded input frames") {
+                val cut = WireDecoder()
+                cut.decode(input1)
+                        .doOnNext {
+                            // releasing retained payload
+                            it.payload.release()
+                        }
+                        .test()
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .verifyComplete()
+                cut.decode(input2).test()
+                        .verifyError(InvalidWireFrameMarkerException::class.java)
+            }
+
+            it("should release memory for 1st input") {
+                verifyMemoryReleased(input1)
+            }
+
+            it("should leave memory unreleased for 2nd input") {
+                verifyMemoryNotReleased(input2)
+            }
+        }
+
+
+        given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
+            val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+            val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+            val input1 = Unpooled.buffer()
+                    .writeBytes(frame1)
+                    .writeBytes(frame2, 3)
+            val input2 = Unpooled.buffer().writeBytes(frame2)
+
+            it("should yield decoded input frames") {
+                val cut = WireDecoder()
+                cut.decode(input1).test()
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .verifyComplete()
+                cut.decode(input2).test()
+                        .expectNextMatches { it.payloadSize == anotherPayload.size }
+                        .verifyComplete()
+            }
+
+            it("should release memory") {
+                verifyMemoryReleased(input1, input2)
+            }
+        }
+
+        given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
+            val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+            val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+            val input1 = Unpooled.buffer()
+                    .writeBytes(frame1, 5)
+            val input2 = Unpooled.buffer()
+                    .writeBytes(frame1)
+                    .writeBytes(frame2)
+
+            it("should yield decoded input frames") {
+                val cut = WireDecoder()
+                cut.decode(input1).test()
+                        .verifyComplete()
+                cut.decode(input2).test()
+                        .expectNextMatches { it.payloadSize == samplePayload.size }
+                        .expectNextMatches { it.payloadSize == anotherPayload.size }
+                        .verifyComplete()
+            }
+
+            it("should release memory") {
+                verifyMemoryReleased(input1, input2)
+            }
+        }
+    }
+})
@@ -26,7 +26,7 @@
       </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+  <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
 
   <root level="INFO">
       <appender-ref ref="CONSOLE"/>
index 1826bcd..c4e9874 100644 (file)
 package org.onap.dcae.collectors.veshv.tests.component
 
 import io.netty.buffer.ByteBuf
+import io.netty.buffer.UnpooledByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.Exceptions
 import reactor.core.publisher.Flux
 import java.time.Duration
 
@@ -36,6 +39,7 @@ import java.time.Duration
 internal class Sut {
     val configurationProvider = FakeConfigurationProvider()
     val sink = FakeSink()
+    val alloc = UnpooledByteBufAllocator.DEFAULT
     private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink))
     val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
@@ -43,8 +47,19 @@ internal class Sut {
         get() = collectorProvider()
 
     fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> {
-        collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
-
+        collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
         return sink.sentMessages
     }
+
+    fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> =
+        try {
+            collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+            Pair(sink.sentMessages, null)
+        } catch (ex: Exception) {
+            Pair(sink.sentMessages, ex)
+        }
+
+    companion object {
+        val logger = Logger(Sut::class)
+    }
 }
index 26032ff..fc4fb65 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.tests.component
 
+import com.google.protobuf.InvalidProtocolBufferException
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException
 import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -40,29 +43,76 @@ object VesHvSpecification : Spek({
                     .describedAs("should send all events")
                     .hasSize(2)
         }
+    }
+
+    describe("Memory management") {
 
-        system("should release memory for each incoming message") { sut ->
+        system("should release memory for each handled and dropped message") { sut ->
             sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
-            val msgWithInvalidPayload = invalidVesMessage()
             val msgWithInvalidFrame = invalidWireFrame()
-            val validMessage = vesMessage(Domain.HVRANMEAS)
-            val refCntBeforeSending = msgWithInvalidDomain.refCnt()
+            val expectedRefCnt = 0
+
+            val (handledEvents, exception) = sut.handleConnectionReturningError(
+                    validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
 
-            sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
+            assertThat(handledEvents).hasSize(1)
+            assertThat(exception).isNull()
 
+            assertThat(validMessage.refCnt())
+                    .describedAs("handled message should be released")
+                    .isEqualTo(expectedRefCnt)
             assertThat(msgWithInvalidDomain.refCnt())
                     .describedAs("message with invalid domain should be released")
-                    .isEqualTo(refCntBeforeSending)
-            assertThat(msgWithInvalidPayload.refCnt())
-                    .describedAs("message with invalid payload should be released")
-                    .isEqualTo(refCntBeforeSending)
+                    .isEqualTo(expectedRefCnt)
             assertThat(msgWithInvalidFrame.refCnt())
                     .describedAs("message with invalid frame should be released")
-                    .isEqualTo(refCntBeforeSending)
+                    .isEqualTo(expectedRefCnt)
+
+        }
+
+        system("should release memory for each message with invalid payload") { sut ->
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val validMessage = vesMessage(Domain.HVRANMEAS)
+            val msgWithInvalidPayload = invalidVesMessage()
+            val expectedRefCnt = 0
+
+            val (handledEvents, exception) = sut.handleConnectionReturningError(
+                    validMessage, msgWithInvalidPayload)
+
+            assertThat(handledEvents).hasSize(1)
+            assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java)
+
+            assertThat(validMessage.refCnt())
+                    .describedAs("handled message should be released")
+                    .isEqualTo(expectedRefCnt)
+            assertThat(msgWithInvalidPayload.refCnt())
+                    .describedAs("message with invalid payload should be released")
+                    .isEqualTo(expectedRefCnt)
+
+        }
+
+        system("should release memory for each message with garbage frame") { sut ->
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val validMessage = vesMessage(Domain.HVRANMEAS)
+            val msgWithGarbageFrame = garbageFrame()
+            val expectedRefCnt = 0
+
+            val (handledEvents, exception) = sut.handleConnectionReturningError(
+                    validMessage, msgWithGarbageFrame)
+
+            assertThat(handledEvents).hasSize(1)
+            assertThat(exception?.cause)
+                    .isInstanceOf(InvalidWireFrameMarkerException::class.java)
+
             assertThat(validMessage.refCnt())
                     .describedAs("handled message should be released")
-                    .isEqualTo(refCntBeforeSending)
+                    .isEqualTo(expectedRefCnt)
+            assertThat(msgWithGarbageFrame.refCnt())
+                    .describedAs("message with garbage frame should be released")
+                    .isEqualTo(expectedRefCnt)
+
         }
     }
 
index b6342b1..998f314 100644 (file)
@@ -54,6 +54,10 @@ fun invalidVesMessage() = alocator.buffer().run {
 
 }
 
+fun garbageFrame() = alocator.buffer().run {
+    writeBytes("the meaning of life is &@)(*_!".toByteArray())
+}
+
 fun invalidWireFrame() = alocator.buffer().run {
     writeByte(0xFF)
     writeByte(1)
@@ -65,6 +69,7 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr
                 .setCommonEventHeader(
                         CommonEventHeader.getDefaultInstance().toBuilder()
                                 .setVersion("1.0")
+                                .setEventName("xyz")
                                 .setEventId(id)
                                 .setDomain(domain)
                                 .setEventName("Sample event name")
@@ -76,6 +81,3 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr
                                 .setSequence(1))
                 .setHvRanMeasFields(ByteString.EMPTY)
                 .build()
-
-
-
index 809f62d..84abc9d 100644 (file)
@@ -26,7 +26,7 @@
       </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+  <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
 
   <root level="INFO">
       <appender-ref ref="CONSOLE"/>
index 5bd63d8..8c8b471 100644 (file)
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.domain
 
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
 
 /**
  * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
@@ -53,20 +56,20 @@ import io.netty.buffer.ByteBufAllocator
  * @since May 2018
  */
 data class WireFrame(val payload: ByteBuf,
-                     val mark: Short,
                      val majorVersion: Short,
                      val minorVersion: Short,
                      val payloadSize: Int) {
 
+    constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes())
+
     fun isValid(): Boolean =
-            mark == FF_BYTE
-                    && majorVersion == SUPPORTED_MAJOR_VERSION
+            majorVersion == SUPPORTED_MAJOR_VERSION
                     && payload.readableBytes() == payloadSize
 
     fun encode(allocator: ByteBufAllocator): ByteBuf {
         val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
 
-        bb.writeByte(mark.toInt())
+        bb.writeByte(MARKER_BYTE.toInt())
         bb.writeByte(majorVersion.toInt())
         bb.writeByte(minorVersion.toInt())
         bb.writeInt(payloadSize)
@@ -76,20 +79,58 @@ data class WireFrame(val payload: ByteBuf,
     }
 
     companion object {
-        fun decode(byteBuf: ByteBuf): WireFrame {
-            val mark = byteBuf.readUnsignedByte()
+        fun decodeFirst(byteBuf: ByteBuf): WireFrame {
+            verifyNotEmpty(byteBuf)
+            byteBuf.markReaderIndex()
+
+            verifyMarker(byteBuf)
+            verifyMinimumSize(byteBuf)
+
             val majorVersion = byteBuf.readUnsignedByte()
             val minorVersion = byteBuf.readUnsignedByte()
-            val payloadSize = byteBuf.readInt()
-            val payload = byteBuf.retainedSlice()
+            val payloadSize = verifyPayloadSize(byteBuf)
+
+            val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize)
+            byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize)
+
+            return WireFrame(payload, majorVersion, minorVersion, payloadSize)
+        }
+
+        private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
+                byteBuf.readInt().let { payloadSize ->
+                    if (byteBuf.readableBytes() < payloadSize) {
+                        byteBuf.resetReaderIndex()
+                        throw MissingWireFrameBytesException("readable bytes < payload size")
+                    } else {
+                        payloadSize
+                    }
+                }
+
+        private fun verifyMinimumSize(byteBuf: ByteBuf) {
+            if (byteBuf.readableBytes() < HEADER_SIZE) {
+                byteBuf.resetReaderIndex()
+                throw MissingWireFrameBytesException("readable bytes < header size")
+            }
+        }
+
+        private fun verifyMarker(byteBuf: ByteBuf) {
+            val mark = byteBuf.readUnsignedByte()
+            if (mark != MARKER_BYTE) {
+                byteBuf.resetReaderIndex()
+                throw InvalidWireFrameMarkerException(mark)
+            }
+        }
 
-            return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
+        private fun verifyNotEmpty(byteBuf: ByteBuf) {
+            if (byteBuf.readableBytes() < 1) {
+                throw EmptyWireFrameException()
+            }
         }
 
-        private const val HEADER_SIZE =
+        const val HEADER_SIZE =
                 3 * java.lang.Byte.BYTES +
-                1 * java.lang.Integer.BYTES
-        private const val FF_BYTE: Short = 0xFF
-        private const val SUPPORTED_MAJOR_VERSION: Short = 1
+                        1 * java.lang.Integer.BYTES
+        const val MARKER_BYTE: Short = 0xFF
+        const val SUPPORTED_MAJOR_VERSION: Short = 1
     }
 }
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt
new file mode 100644 (file)
index 0000000..6e1ce93
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)")
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.impl
+package org.onap.dcae.collectors.veshv.domain.exceptions
 
-import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
+ * @since June 2018
  */
-internal class WireDecoder {
-    fun decode(byteBuf: ByteBuf): ByteBuf? =
-            try {
-                WireFrame.decode(byteBuf)
-                        .takeIf { it.isValid() }
-                        .let { it?.payload }
-            } catch (ex: IndexOutOfBoundsException) {
-                logger.debug { "Wire protocol frame could not be decoded - input is too small" }
-                null
-            }
-
-    companion object {
-        private val logger = Logger(WireDecoder::class)
-    }
-}
+class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException(
+        "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt
new file mode 100644 (file)
index 0000000..7e4b3ce
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg)
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt
new file mode 100644 (file)
index 0000000..1101383
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+open class WireFrameDecodingException(msg: String) : Exception(msg)
index 5a923c4..0011326 100644 (file)
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
 package org.onap.dcae.collectors.veshv.domain
 
-import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
 import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+import java.nio.charset.Charset
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
 object WireFrameTest : Spek({
-    describe("Wire Frame codec") {
-        describe("encode-decode methods' compatibility") {
-            val payloadContent = "test"
-            val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII))
-            val frame = WireFrame(payload = payload,
-                    majorVersion = 1,
+    val payloadAsString = "coffeebabe"
+
+    fun createSampleFrame() =
+            WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset())))
+
+    fun encodeSampleFrame() =
+            createSampleFrame().let {
+                Unpooled.buffer()
+                        .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT))
+
+            }
+
+    describe("Wire Frame invariants") {
+
+        given("input with unsupported major version") {
+            val input = WireFrame(
+                    payload = Unpooled.EMPTY_BUFFER,
+                    majorVersion = 100,
                     minorVersion = 2,
-                    mark = 0xFF,
-                    payloadSize = payload.readableBytes())
+                    payloadSize = 0)
+
+            it("should fail validation") {
+                assertThat(input.isValid()).isFalse()
+            }
+        }
+
+        given("input with too small payload size") {
+            val input = WireFrame(
+                    payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+                    majorVersion = 1,
+                    minorVersion = 0,
+                    payloadSize = 1)
+
+            it("should fail validation") {
+                assertThat(input.isValid()).isFalse()
+            }
+        }
+
+        given("input with too big payload size") {
+            val input = WireFrame(
+                    payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+                    majorVersion = 1,
+                    minorVersion = 0,
+                    payloadSize = 8)
+
+            it("should fail validation") {
+                assertThat(input.isValid()).isFalse()
+            }
+        }
+
+        given("valid input") {
+            val payload = byteArrayOf(6, 9, 8, 6)
+            val input = WireFrame(
+                    payload = Unpooled.wrappedBuffer(payload),
+                    majorVersion = 1,
+                    minorVersion = 0,
+                    payloadSize = payload.size)
+
+            it("should pass validation") {
+                assertThat(input.isValid()).isTrue()
+            }
+        }
 
-            val encoded = frame.encode(ByteBufAllocator.DEFAULT)
-            val decoded = WireFrame.decode(encoded)
+
+    }
+
+    describe("Wire Frame codec") {
+
+        describe("encode-decode methods' compatibility") {
+            val frame = createSampleFrame()
+            val encoded = encodeSampleFrame()
+            val decoded = WireFrame.decodeFirst(encoded)
 
             it("should decode major version") {
                 assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
@@ -33,17 +117,13 @@ object WireFrameTest : Spek({
                 assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion)
             }
 
-            it("should decode mark") {
-                assertThat(decoded.mark).isEqualTo(frame.mark)
-            }
-
             it("should decode payload size") {
                 assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
             }
 
             it("should decode payload") {
-                assertThat(decoded.payload.toString(Charsets.US_ASCII))
-                        .isEqualTo(payloadContent)
+                assertThat(decoded.payload.toString(Charset.defaultCharset()))
+                        .isEqualTo(payloadAsString)
             }
 
             it("should retain decoded payload") {
@@ -51,5 +131,55 @@ object WireFrameTest : Spek({
                 assertThat(decoded.payload.refCnt()).isEqualTo(1)
             }
         }
+
+        describe("TCP framing") {
+            // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
+
+            it("should decode message leaving rest unread") {
+                val buff = Unpooled.buffer()
+                        .writeBytes(encodeSampleFrame())
+                        .writeByte(0xAA)
+                val decoded = WireFrame.decodeFirst(buff)
+
+                assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+                assertThat(buff.readableBytes()).isEqualTo(1)
+            }
+
+            it("should throw exception when not even header fits") {
+                val buff = Unpooled.buffer()
+                        .writeByte(0xFF)
+
+                assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
+                        .isThrownBy { WireFrame.decodeFirst(buff) }
+            }
+
+            it("should throw exception when first byte is not 0xFF but length looks ok") {
+                val buff = Unpooled.buffer()
+                        .writeByte(0xAA)
+                        .writeBytes("some garbage".toByteArray())
+
+                assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
+                        .isThrownBy { WireFrame.decodeFirst(buff) }
+            }
+
+            it("should throw exception when first byte is not 0xFF and length is to short") {
+                val buff = Unpooled.buffer()
+                        .writeByte(0xAA)
+
+                assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
+                        .isThrownBy { WireFrame.decodeFirst(buff) }
+            }
+
+            it("should throw exception when payload doesn't fit") {
+                val buff = Unpooled.buffer()
+                        .writeBytes(encodeSampleFrame())
+                buff.writerIndex(buff.writerIndex() - 2)
+
+                assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
+                        .isThrownBy { WireFrame.decodeFirst(buff) }
+            }
+
+        }
     }
+
 })
\ No newline at end of file
index ceb45ea..1367ff1 100644 (file)
@@ -12,4 +12,4 @@ ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
 CMD ["--listen-port", "6061"]
 COPY target/libs/external/* ./
 COPY target/libs/internal/* ./
-COPY target/hv-collector-main-*.jar ./
\ No newline at end of file
+COPY target/hv-collector-main-*.jar ./
index 89b31b5..4438cf3 100644 (file)
@@ -39,7 +39,7 @@ fun main(args: Array<String>) {
 
         val collectorProvider = CollectorFactory(
                 resolveConfigurationProvider(serverConfiguration),
-                AdapterFactory.kafkaSink()
+                AdapterFactory.loggingSink()
         ).createVesHvCollectorProvider()
         ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
     } catch (ex: WrongArgumentException) {
index 809f62d..48da3b1 100644 (file)
@@ -26,7 +26,8 @@
       </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+  <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
+  <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
 
   <root level="INFO">
       <appender-ref ref="CONSOLE"/>
index b96a8b3..eb52a86 100644 (file)
@@ -25,6 +25,17 @@ import kotlin.reflect.KClass
 class Logger(val logger: org.slf4j.Logger) {
     constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java))
 
+    //
+    // TRACE
+    //
+
+    fun trace(messageProvider: () -> String) {
+        if (logger.isTraceEnabled) {
+            logger.trace(messageProvider())
+        }
+    }
+
+
     //
     // DEBUG
     //