Implement Kafka Sink 63/58563/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 8 Jun 2018 14:29:31 +0000 (16:29 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 05:06:19 +0000 (07:06 +0200)
Closes ONAP-146

Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

28 files changed:
docker-compose.yml
hv-collector-client-simulator/Dockerfile
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt [moved from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt with 92% similarity]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt [moved from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt with 81% similarity]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt [new file with mode: 0644]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt [moved from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt with 89% similarity]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt [moved from hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt with 80% similarity]
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt [new file with mode: 0644]
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt [new file with mode: 0644]
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt [moved from hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt with 83% similarity]
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt

index af8e0e0..0f0cca2 100644 (file)
@@ -9,14 +9,15 @@ services:
     ports:
       - "9092:9092"
     environment:
-      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
+      KAFKA_ADVERTISED_HOST_NAME: "kafka"
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
+      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
     volumes:
       - /var/run/docker.sock:/var/run/docker.sock
     depends_on:
       - zookeeper
-  hv-collector:
+  veshvcollector:
     build:
       context: hv-collector-main
       dockerfile: Dockerfile
@@ -26,11 +27,11 @@ services:
       - kafka
     volumes:
       - ./ssl/:/etc/ves-hv/
-  xnf-simulator:
+  xnfsimulator:
     build:
       context: hv-collector-client-simulator
       dockerfile: Dockerfile
     depends_on:
-      - hv-collector
+      - veshvcollector
     volumes:
       - ./ssl/:/etc/ves-hv/
\ No newline at end of file
index 19c4c87..58cfa44 100644 (file)
@@ -7,7 +7,7 @@ LABEL maintainer="Nokia Wroclaw ONAP Team"
 
 WORKDIR /opt/ves-hv-client-simulator
 ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"]
-CMD ["--ves-host", "hv-collector", "--ves-port", "6061"]
+CMD ["--ves-host", "veshvcollector", "--ves-port", "6061"]
 COPY target/libs/external/* ./
 COPY target/libs/internal/* ./
 COPY target/hv-collector-client-simulator-*.jar ./
index d5f7c7c..87a238a 100644 (file)
@@ -20,8 +20,6 @@
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import com.google.protobuf.ByteString
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
 import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.ves.VesEventV5
 import reactor.core.publisher.Flux
@@ -65,12 +63,12 @@ class MessageFactory {
         return WireFrame(payload)
     }
 
-    private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
+    private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray {
         val msg = VesEventV5.VesEvent.newBuilder()
                 .setCommonEventHeader(commonHeader)
                 .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
                 .build()
 
-        return Unpooled.wrappedBuffer(msg.toByteArray())
+        return msg.toByteArray()
     }
 }
index 29573e8..cb56db9 100644 (file)
@@ -25,6 +25,7 @@ import io.netty.handler.ssl.SslContext
 import io.netty.handler.ssl.SslContextBuilder
 import io.netty.handler.ssl.SslProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -64,8 +65,10 @@ class VesHvClient(configuration: ClientConfiguration) {
                 .asString(Charsets.UTF_8)
                 .subscribe { str -> logger.info("Server response: $str") }
 
+        val encoder = WireFrameEncoder(nettyOutbound.alloc())
+
         val frames = messages
-                .map { it.encode(nettyOutbound.alloc()) }
+                .map(encoder::encode)
                 .concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
 
         return nettyOutbound
index 913d8f5..73f4d09 100644 (file)
@@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.impl.MessageValidator
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
+import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import reactor.core.publisher.Flux
 import java.util.concurrent.atomic.AtomicReference
 
@@ -36,7 +37,8 @@ import java.util.concurrent.atomic.AtomicReference
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) {
+class CollectorFactory(val configuration: ConfigurationProvider,
+                       private val sinkProvider: SinkProvider) {
     fun createVesHvCollectorProvider(): CollectorProvider {
         val collector: AtomicReference<Collector> = AtomicReference()
         createVesHvCollector().subscribe(collector::set)
@@ -48,7 +50,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide
 
     private fun createVesHvCollector(config: CollectorConfiguration): Collector {
         return VesHvCollector(
-                { WireDecoder(it) },
+                { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
                 VesDecoder(),
                 MessageValidator(),
                 Router(config.routing),
index 60e7d70..591a48b 100644 (file)
@@ -19,9 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventV5.VesEvent
 
 /**
@@ -30,8 +29,8 @@ import org.onap.ves.VesEventV5.VesEvent
  */
 internal class VesDecoder {
 
-    fun decode(bb: ByteBuf): VesMessage {
-        val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
-        return VesMessage(decodedHeader, bb)
+    fun decode(bytes: ByteData): VesMessage {
+        val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+        return VesMessage(decodedHeader, bytes)
     }
 }
index ac11b3e..965943f 100644 (file)
@@ -24,57 +24,42 @@ import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
+import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import java.util.concurrent.atomic.AtomicInteger
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
 internal class VesHvCollector(
-        private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder,
+        private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
         private val protobufDecoder: VesDecoder,
         private val validator: MessageValidator,
         private val router: Router,
         private val sink: Sink) : Collector {
 
     override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
-            wireDecoderSupplier(alloc).let { wireDecoder ->
+            wireChunkDecoderSupplier(alloc).let { wireDecoder ->
                 dataStream
                         .concatMap(wireDecoder::decode)
                         .filter(WireFrame::isValid)
                         .map(WireFrame::payload)
                         .map(protobufDecoder::decode)
-                        .filter(this::validate)
+                        .filter(validator::isValid)
                         .flatMap(this::findRoute)
                         .compose(sink::send)
-                        .doOnNext(this::releaseMemory)
                         .doOnTerminate { releaseBuffersMemory(wireDecoder) }
                         .then()
             }
 
-    private fun validate(msg: VesMessage): Boolean {
-        val valid = validator.isValid(msg)
-        if (!valid) {
-            msg.rawMessage.release()
-        }
-        return valid
-    }
-
     private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
 
-    private fun releaseMemory(msg: VesMessage) {
-        logger.trace { "Releasing memory from ${msg.rawMessage}" }
-        msg.rawMessage.release()
-    }
-
-    private fun releaseBuffersMemory(wireDecoder: WireDecoder) {
-        wireDecoder.release()
+    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
+        wireChunkDecoder.release()
     }
 
     private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
index 8e6db2a..358be10 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import reactor.core.publisher.Flux
 import reactor.ipc.netty.http.client.HttpClient
index 62b6d1a..b943e4e 100644 (file)
@@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider {
 
             private fun logMessage(msg: RoutedMessage) {
                 val msgs = totalMessages.addAndGet(1)
-                val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong())
+                val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
                 val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
                 if (msgs % INFO_LOGGING_FREQ == 0L)
                     logger.info(logMessageSupplier)
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -28,13 +28,12 @@ import reactor.core.publisher.Flux
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderRecord
 import reactor.kafka.sender.SenderResult
-import java.nio.ByteBuffer
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, ByteBuffer>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
 
     override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
         val records = messages.map(this::vesToKafkaRecord)
@@ -44,13 +43,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, Byte
                 .map { it.correlationMetadata() }
     }
 
-    private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, ByteBuffer, VesMessage> {
+    private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> {
         return SenderRecord.create(
                 msg.topic,
                 msg.partition,
                 System.currentTimeMillis(),
                 msg.message.header,
-                msg.message.rawMessage.nioBuffer(),
+                msg.message,
                 msg.message)
     }
 
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization.ByteBufferSerializer
-import org.apache.kafka.common.serialization.StringSerializer
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderOptions
-import java.nio.ByteBuffer
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -40,9 +38,8 @@ internal class KafkaSinkProvider : SinkProvider {
     }
 
     private fun constructSenderOptions(config: CollectorConfiguration) =
-            SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>()
+            SenderOptions.create<CommonEventHeader, VesMessage>()
                     .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
-                    .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
-                    .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)
-
+                    .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+                    .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
 }
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
new file mode 100644 (file)
index 0000000..9753d9e
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import com.google.protobuf.MessageLite
+import org.apache.kafka.common.serialization.Serializer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ProtobufSerializer :Serializer<MessageLite> {
+    override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+        // no configuration
+    }
+
+    override fun serialize(topic: String?, data: MessageLite?): ByteArray? =
+            data?.toByteArray()
+
+    override fun close() {
+        // cleanup not needed
+    }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
new file mode 100644 (file)
index 0000000..7a6ac7c
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.apache.kafka.common.serialization.Serializer
+import org.onap.dcae.collectors.veshv.model.VesMessage
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class VesMessageSerializer : Serializer<VesMessage> {
+    override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+    }
+
+    override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+
+    override fun close() {
+    }
+}
index e4dd7cf..34a8b92 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.CompositeByteBuf
 import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.FluxSink
@@ -33,6 +34,7 @@ import java.util.function.Consumer
  * @since May 2018
  */
 internal class StreamBufferEmitter(
+        private val decoder: WireFrameDecoder,
         private val streamBuffer: CompositeByteBuf,
         private val newFrame: ByteBuf)
     : Consumer<FluxSink<WireFrame>> {
@@ -58,15 +60,15 @@ internal class StreamBufferEmitter(
                     streamBuffer.discardReadComponents()
                 }
                 sink.onRequest { requestedFrameCount ->
-                    WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber()
+                    WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber()
                 }
             }
         }
     }
 
     companion object {
-        fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
-                Flux.create(StreamBufferEmitter(streamBuffer, newFrame))
+        fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
+                Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame))
 
         private const val INCREASE_WRITER_INDEX = true
         private val logger = Logger(StreamBufferEmitter::class)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
@@ -30,10 +31,10 @@ import reactor.core.publisher.Flux
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
     private val streamBuffer = alloc.compositeBuffer()
 
-    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf)
+    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf)
             .doOnSubscribe { logIncomingMessage(byteBuf) }
             .doOnNext(this::logDecodedWireMessage)
 
@@ -41,7 +42,6 @@ internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
         streamBuffer.release()
     }
 
-
     private fun logIncomingMessage(wire: ByteBuf) {
         logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
     }
index bc9c838..a576dc6 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
 
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.FluxSink
@@ -30,6 +31,7 @@ import reactor.core.publisher.FluxSink
  * @since May 2018
  */
 internal class WireFrameSink(
+        private val decoder: WireFrameDecoder,
         private val streamBuffer: ByteBuf,
         private val sink: FluxSink<WireFrame>,
         private val requestedFrameCount: Long) {
@@ -80,7 +82,7 @@ internal class WireFrameSink(
 
     private fun decodeFirstFrameFromBuffer(): WireFrame? =
             try {
-                WireFrame.decodeFirst(streamBuffer)
+                decoder.decodeFirst(streamBuffer)
             } catch (ex: MissingWireFrameBytesException) {
                 logger.debug { "${ex.message} - waiting for more data" }
                 null
index 3825689..03c53e1 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.model
 
-import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf)
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
index 10e7915..bc03058 100644 (file)
@@ -73,7 +73,7 @@ class RouteBuilder {
         this.targetTopic = targetTopic
     }
 
-    fun withFixedPartitioning(num: Int = 1) {
+    fun withFixedPartitioning(num: Int = 0) {
         partitioning = { _ -> num }
     }
 
index df2840b..017187a 100644 (file)
 package org.onap.dcae.collectors.veshv.impl
 
 import com.google.protobuf.ByteString
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
-import io.netty.buffer.Unpooled.wrappedBuffer
+import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.toByteData
 import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import org.onap.ves.VesEventV5.VesEvent
-import org.assertj.core.api.Assertions.assertThat
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.*
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
 
 internal object MessageValidatorTest : Spek({
 
-    fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf {
+    fun vesMessageBytes(commonHeader: CommonEventHeader): ByteData {
         val msg = VesEvent.newBuilder()
                 .setCommonEventHeader(commonHeader)
                 .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
                 .build()
-        return wrappedBuffer(msg.toByteArray())
+        return msg.toByteData()
     }
 
     given("Message validator") {
@@ -79,7 +81,7 @@ internal object MessageValidatorTest : Spek({
         }
 
         on("ves hv message bytes") {
-            val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER)
+            val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
             it("should not accept message with default header") {
                 assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
             }
@@ -97,7 +99,7 @@ internal object MessageValidatorTest : Spek({
                     .setCommonEventHeader(commonHeader)
                     .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!"))
                     .build()
-            val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+            val rawMessageBytes = msg.toByteData()
 
             it("should not accept not fully initialized message header ") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
index 3812db5..c852f5f 100644 (file)
@@ -1,11 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
 package org.onap.dcae.collectors.veshv.impl
 
-import io.netty.buffer.Unpooled
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -34,7 +53,7 @@ object RouterTest : Spek({
         val cut = Router(config)
 
         on("message with existing route (rtpm)") {
-            val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER)
+            val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), ByteData.EMPTY)
             val result = cut.findDestination(message)
 
             it("should have route available") {
@@ -55,7 +74,7 @@ object RouterTest : Spek({
         }
 
         on("message with existing route (trace)") {
-            val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER)
+            val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), ByteData.EMPTY)
             val result = cut.findDestination(message)
 
             it("should have route available") {
@@ -63,7 +82,7 @@ object RouterTest : Spek({
             }
 
             it("should be routed to proper partition") {
-                assertThat(result?.partition).isEqualTo(1)
+                assertThat(result?.partition).isEqualTo(0)
             }
 
             it("should be routed to proper topic") {
@@ -76,7 +95,7 @@ object RouterTest : Spek({
         }
 
         on("message with unknown route") {
-            val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER)
+            val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), ByteData.EMPTY)
             val result = cut.findDestination(message)
 
             it("should not have route available") {
index 263ad44..90b34b1 100644 (file)
@@ -21,13 +21,14 @@ package org.onap.dcae.collectors.veshv.impl
 
 import com.google.protobuf.ByteString
 import com.google.protobuf.InvalidProtocolBufferException
-import io.netty.buffer.Unpooled.wrappedBuffer
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.assertThatExceptionOfType
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.toByteData
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventV5.VesEvent
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -45,7 +46,7 @@ internal object VesDecoderTest : Spek({
                     .setCommonEventHeader(commonHeader)
                     .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
                     .build()
-            val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+            val rawMessageBytes = msg.toByteData()
 
 
             it("should decode only header and pass it on along with raw message") {
@@ -60,7 +61,7 @@ internal object VesDecoderTest : Spek({
         }
 
         on("invalid ves hv message bytes") {
-            val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
+            val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
 
             it("should throw error") {
                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
@@ -28,6 +28,8 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
 import reactor.test.test
 
@@ -35,13 +37,17 @@ import reactor.test.test
  * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
  * @since May 2018
  */
-internal object WireDecoderTest : Spek({
+internal object WireChunkDecoderTest : Spek({
     val alloc = UnpooledByteBufAllocator.DEFAULT
     val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
     val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
 
-    fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc))
+    val encoder = WireFrameEncoder(alloc)
+    
+    fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
 
+    fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
+    
     fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
         for (bb in byteBuffers) {
             assertThat(bb.refCnt())
@@ -63,7 +69,7 @@ internal object WireDecoderTest : Spek({
             val input = Unpooled.EMPTY_BUFFER
 
             it("should yield empty result") {
-                WireDecoder().decode(input).test().verifyComplete()
+                createInstance().decode(input).test().verifyComplete()
             }
         }
 
@@ -71,7 +77,7 @@ internal object WireDecoderTest : Spek({
             val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
 
             it("should yield empty result") {
-                WireDecoder().decode(input).test().verifyComplete()
+                createInstance().decode(input).test().verifyComplete()
             }
 
             it("should release memory") {
@@ -83,7 +89,7 @@ internal object WireDecoderTest : Spek({
             val input = Unpooled.wrappedBuffer(samplePayload)
 
             it("should yield error") {
-                WireDecoder().decode(input).test()
+                createInstance().decode(input).test()
                         .verifyError(InvalidWireFrameMarkerException::class.java)
             }
 
@@ -93,10 +99,10 @@ internal object WireDecoderTest : Spek({
         }
 
         given("valid input") {
-            val input = WireFrame(Unpooled.wrappedBuffer(samplePayload))
+            val input = WireFrame(samplePayload)
 
             it("should yield decoded input frame") {
-                WireDecoder().decode(input).test()
+                createInstance().decode(input).test()
                         .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
             }
@@ -104,11 +110,11 @@ internal object WireDecoderTest : Spek({
 
         given("valid input with part of next frame") {
             val input = Unpooled.buffer()
-                    .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
-                    .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3))
+                    .writeBytes(encoder.encode(WireFrame(samplePayload)))
+                    .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
 
             it("should yield decoded input frame") {
-                WireDecoder().decode(input).test()
+                createInstance().decode(input).test()
                         .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
             }
@@ -120,11 +126,11 @@ internal object WireDecoderTest : Spek({
 
         given("valid input with garbage after it") {
             val input = Unpooled.buffer()
-                    .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+                    .writeBytes(encoder.encode(WireFrame(samplePayload)))
                     .writeBytes(Unpooled.wrappedBuffer(samplePayload))
 
             it("should yield decoded input frame and error") {
-                WireDecoder().decode(input).test()
+                createInstance().decode(input).test()
                         .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyError(InvalidWireFrameMarkerException::class.java)
             }
@@ -135,11 +141,11 @@ internal object WireDecoderTest : Spek({
         }
 
         given("two inputs containing two separate messages") {
-            val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
-            val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+            val input1 = encoder.encode(WireFrame(samplePayload))
+            val input2 = encoder.encode(WireFrame(anotherPayload))
 
             it("should yield decoded input frames") {
-                val cut = WireDecoder()
+                val cut = createInstance()
                 cut.decode(input1).test()
                         .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
@@ -154,16 +160,12 @@ internal object WireDecoderTest : Spek({
         }
 
         given("1st input containing 1st frame and 2nd input containing garbage") {
-            val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+            val input1 = encoder.encode(WireFrame(samplePayload))
             val input2 = Unpooled.wrappedBuffer(anotherPayload)
 
             it("should yield decoded input frames") {
-                val cut = WireDecoder()
+                val cut = createInstance()
                 cut.decode(input1)
-                        .doOnNext {
-                            // releasing retained payload
-                            it.payload.release()
-                        }
                         .test()
                         .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
@@ -182,8 +184,8 @@ internal object WireDecoderTest : Spek({
 
 
         given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
-            val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
-            val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+            val frame1 = encoder.encode(WireFrame(samplePayload))
+            val frame2 = encoder.encode(WireFrame(anotherPayload))
 
             val input1 = Unpooled.buffer()
                     .writeBytes(frame1)
@@ -191,7 +193,7 @@ internal object WireDecoderTest : Spek({
             val input2 = Unpooled.buffer().writeBytes(frame2)
 
             it("should yield decoded input frames") {
-                val cut = WireDecoder()
+                val cut = createInstance()
                 cut.decode(input1).test()
                         .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
@@ -206,8 +208,8 @@ internal object WireDecoderTest : Spek({
         }
 
         given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
-            val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
-            val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+            val frame1 = encoder.encode(WireFrame(samplePayload))
+            val frame2 = encoder.encode(WireFrame(anotherPayload))
 
             val input1 = Unpooled.buffer()
                     .writeBytes(frame1, 5)
@@ -216,7 +218,7 @@ internal object WireDecoderTest : Spek({
                     .writeBytes(frame2)
 
             it("should yield decoded input frames") {
-                val cut = WireDecoder()
+                val cut = createInstance()
                 cut.decode(input1).test()
                         .verifyComplete()
                 cut.decode(input2).test()
index fc4fb65..08b6382 100644 (file)
@@ -24,7 +24,6 @@ import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException
 import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -125,7 +124,7 @@ object VesHvSpecification : Spek({
 
             val msg = messages[0]
             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
-            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1)
+            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
         }
 
         system("should drop message if route was not found") { sut ->
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt
new file mode 100644 (file)
index 0000000..2b84e3f
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import com.google.protobuf.MessageLite
+import io.netty.buffer.ByteBuf
+import java.nio.charset.Charset
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ByteData(private val data: ByteArray) {
+
+    fun size() = data.size
+
+    /**
+     * This will expose mutable state of the data.
+     *
+     * @return wrapped data buffer (NOT a copy)
+     */
+    fun unsafeAsArray() = data
+
+    fun writeTo(byteBuf: ByteBuf) {
+        byteBuf.writeBytes(data)
+    }
+
+    fun asString(charset: Charset = Charset.defaultCharset()) = String(data, charset)
+
+    companion object {
+        val EMPTY = ByteData(byteArrayOf())
+
+        fun readFrom(byteBuf: ByteBuf, length: Int): ByteData {
+            val dataArray = ByteArray(length)
+            byteBuf.readBytes(dataArray)
+            return ByteData(dataArray)
+        }
+    }
+}
+
+fun MessageLite.toByteData(): ByteData = ByteData(toByteArray())
index 8c8b471..caf13c5 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.domain
 
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-
 /**
  * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
  *
@@ -55,82 +49,25 @@ import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesExc
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class WireFrame(val payload: ByteBuf,
+data class WireFrame(val payload: ByteData,
                      val majorVersion: Short,
                      val minorVersion: Short,
                      val payloadSize: Int) {
 
-    constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes())
+    constructor(payload: ByteArray) : this(ByteData(payload), 1, 0, payload.size)
 
     fun isValid(): Boolean =
             majorVersion == SUPPORTED_MAJOR_VERSION
-                    && payload.readableBytes() == payloadSize
-
-    fun encode(allocator: ByteBufAllocator): ByteBuf {
-        val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
-
-        bb.writeByte(MARKER_BYTE.toInt())
-        bb.writeByte(majorVersion.toInt())
-        bb.writeByte(minorVersion.toInt())
-        bb.writeInt(payloadSize)
-        bb.writeBytes(payload)
-
-        return bb
-    }
+                    && payload.size() == payloadSize
 
     companion object {
-        fun decodeFirst(byteBuf: ByteBuf): WireFrame {
-            verifyNotEmpty(byteBuf)
-            byteBuf.markReaderIndex()
-
-            verifyMarker(byteBuf)
-            verifyMinimumSize(byteBuf)
-
-            val majorVersion = byteBuf.readUnsignedByte()
-            val minorVersion = byteBuf.readUnsignedByte()
-            val payloadSize = verifyPayloadSize(byteBuf)
-
-            val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize)
-            byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize)
-
-            return WireFrame(payload, majorVersion, minorVersion, payloadSize)
-        }
-
-        private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
-                byteBuf.readInt().let { payloadSize ->
-                    if (byteBuf.readableBytes() < payloadSize) {
-                        byteBuf.resetReaderIndex()
-                        throw MissingWireFrameBytesException("readable bytes < payload size")
-                    } else {
-                        payloadSize
-                    }
-                }
-
-        private fun verifyMinimumSize(byteBuf: ByteBuf) {
-            if (byteBuf.readableBytes() < HEADER_SIZE) {
-                byteBuf.resetReaderIndex()
-                throw MissingWireFrameBytesException("readable bytes < header size")
-            }
-        }
-
-        private fun verifyMarker(byteBuf: ByteBuf) {
-            val mark = byteBuf.readUnsignedByte()
-            if (mark != MARKER_BYTE) {
-                byteBuf.resetReaderIndex()
-                throw InvalidWireFrameMarkerException(mark)
-            }
-        }
-
-        private fun verifyNotEmpty(byteBuf: ByteBuf) {
-            if (byteBuf.readableBytes() < 1) {
-                throw EmptyWireFrameException()
-            }
-        }
+        const val SUPPORTED_MAJOR_VERSION: Short = 1
 
         const val HEADER_SIZE =
                 3 * java.lang.Byte.BYTES +
                         1 * java.lang.Integer.BYTES
         const val MARKER_BYTE: Short = 0xFF
-        const val SUPPORTED_MAJOR_VERSION: Short = 1
+
     }
+
 }
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
new file mode 100644 (file)
index 0000000..d6804c7
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameEncoder(val allocator: ByteBufAllocator) {
+
+    fun encode(frame: WireFrame): ByteBuf {
+        val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+
+        bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+        bb.writeByte(frame.majorVersion.toInt())
+        bb.writeByte(frame.minorVersion.toInt())
+        bb.writeInt(frame.payloadSize)
+        frame.payload.writeTo(bb)
+
+        return bb
+    }
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameDecoder {
+
+    fun decodeFirst(byteBuf: ByteBuf): WireFrame {
+        verifyNotEmpty(byteBuf)
+        byteBuf.markReaderIndex()
+
+        verifyMarker(byteBuf)
+        verifyMinimumSize(byteBuf)
+
+        val majorVersion = byteBuf.readUnsignedByte()
+        val minorVersion = byteBuf.readUnsignedByte()
+        val payloadSize = verifyPayloadSize(byteBuf)
+        val payload = ByteData.readFrom(byteBuf, payloadSize)
+
+        return WireFrame(payload, majorVersion, minorVersion, payloadSize)
+    }
+
+    private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
+            byteBuf.readInt().let { payloadSize ->
+                if (byteBuf.readableBytes() < payloadSize) {
+                    byteBuf.resetReaderIndex()
+                    throw MissingWireFrameBytesException("readable bytes < payload size")
+                } else {
+                    payloadSize
+                }
+            }
+
+    private fun verifyMinimumSize(byteBuf: ByteBuf) {
+        if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) {
+            byteBuf.resetReaderIndex()
+            throw MissingWireFrameBytesException("readable bytes < header size")
+        }
+    }
+
+    private fun verifyMarker(byteBuf: ByteBuf) {
+        val mark = byteBuf.readUnsignedByte()
+        if (mark != WireFrame.MARKER_BYTE) {
+            byteBuf.resetReaderIndex()
+            throw InvalidWireFrameMarkerException(mark)
+        }
+    }
+
+    private fun verifyNotEmpty(byteBuf: ByteBuf) {
+        if (byteBuf.readableBytes() < 1) {
+            throw EmptyWireFrameException()
+        }
+    }
+}
@@ -35,24 +35,24 @@ import java.nio.charset.Charset
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-object WireFrameTest : Spek({
+object WireFrameCodecsTest : Spek({
     val payloadAsString = "coffeebabe"
+    val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT)
+    val decoder = WireFrameDecoder()
 
     fun createSampleFrame() =
-            WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset())))
+            WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
 
     fun encodeSampleFrame() =
             createSampleFrame().let {
-                Unpooled.buffer()
-                        .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT))
-
+                encoder.encode(it)
             }
 
     describe("Wire Frame invariants") {
 
         given("input with unsupported major version") {
             val input = WireFrame(
-                    payload = Unpooled.EMPTY_BUFFER,
+                    payload = ByteData.EMPTY,
                     majorVersion = 100,
                     minorVersion = 2,
                     payloadSize = 0)
@@ -64,7 +64,7 @@ object WireFrameTest : Spek({
 
         given("input with too small payload size") {
             val input = WireFrame(
-                    payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+                    payload = ByteData(byteArrayOf(1, 2, 3)),
                     majorVersion = 1,
                     minorVersion = 0,
                     payloadSize = 1)
@@ -76,7 +76,7 @@ object WireFrameTest : Spek({
 
         given("input with too big payload size") {
             val input = WireFrame(
-                    payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+                    payload = ByteData(byteArrayOf(1, 2, 3)),
                     majorVersion = 1,
                     minorVersion = 0,
                     payloadSize = 8)
@@ -89,7 +89,7 @@ object WireFrameTest : Spek({
         given("valid input") {
             val payload = byteArrayOf(6, 9, 8, 6)
             val input = WireFrame(
-                    payload = Unpooled.wrappedBuffer(payload),
+                    payload = ByteData(payload),
                     majorVersion = 1,
                     minorVersion = 0,
                     payloadSize = payload.size)
@@ -107,7 +107,7 @@ object WireFrameTest : Spek({
         describe("encode-decode methods' compatibility") {
             val frame = createSampleFrame()
             val encoded = encodeSampleFrame()
-            val decoded = WireFrame.decodeFirst(encoded)
+            val decoded = decoder.decodeFirst(encoded)
 
             it("should decode major version") {
                 assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
@@ -122,14 +122,9 @@ object WireFrameTest : Spek({
             }
 
             it("should decode payload") {
-                assertThat(decoded.payload.toString(Charset.defaultCharset()))
+                assertThat(decoded.payload.asString())
                         .isEqualTo(payloadAsString)
             }
-
-            it("should retain decoded payload") {
-                encoded.release()
-                assertThat(decoded.payload.refCnt()).isEqualTo(1)
-            }
         }
 
         describe("TCP framing") {
@@ -139,7 +134,7 @@ object WireFrameTest : Spek({
                 val buff = Unpooled.buffer()
                         .writeBytes(encodeSampleFrame())
                         .writeByte(0xAA)
-                val decoded = WireFrame.decodeFirst(buff)
+                val decoded = decoder.decodeFirst(buff)
 
                 assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
                 assertThat(buff.readableBytes()).isEqualTo(1)
@@ -150,7 +145,7 @@ object WireFrameTest : Spek({
                         .writeByte(0xFF)
 
                 assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
-                        .isThrownBy { WireFrame.decodeFirst(buff) }
+                        .isThrownBy { decoder.decodeFirst(buff) }
             }
 
             it("should throw exception when first byte is not 0xFF but length looks ok") {
@@ -159,7 +154,7 @@ object WireFrameTest : Spek({
                         .writeBytes("some garbage".toByteArray())
 
                 assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
-                        .isThrownBy { WireFrame.decodeFirst(buff) }
+                        .isThrownBy { decoder.decodeFirst(buff) }
             }
 
             it("should throw exception when first byte is not 0xFF and length is to short") {
@@ -167,7 +162,7 @@ object WireFrameTest : Spek({
                         .writeByte(0xAA)
 
                 assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
-                        .isThrownBy { WireFrame.decodeFirst(buff) }
+                        .isThrownBy { decoder.decodeFirst(buff) }
             }
 
             it("should throw exception when payload doesn't fit") {
@@ -176,7 +171,7 @@ object WireFrameTest : Spek({
                 buff.writerIndex(buff.writerIndex() - 2)
 
                 assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
-                        .isThrownBy { WireFrame.decodeFirst(buff) }
+                        .isThrownBy { decoder.decodeFirst(buff) }
             }
 
         }
index 4438cf3..b2f4633 100644 (file)
 package org.onap.dcae.collectors.veshv.main
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
 import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
 import org.onap.dcae.collectors.veshv.main.ArgBasedServerConfiguration.WrongArgumentException
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import org.slf4j.LoggerFactory
 import kotlin.system.exitProcess
@@ -39,7 +39,7 @@ fun main(args: Array<String>) {
 
         val collectorProvider = CollectorFactory(
                 resolveConfigurationProvider(serverConfiguration),
-                AdapterFactory.loggingSink()
+                AdapterFactory.kafkaSink()
         ).createVesHvCollectorProvider()
         ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
     } catch (ex: WrongArgumentException) {
@@ -55,7 +55,7 @@ private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguratio
     if (serverConfiguration.configurationUrl.isEmpty()) {
         logger.info("Configuration url not specified - using default config")
         val sampleConfig = CollectorConfiguration(
-                kafkaBootstrapServers = "dmaap.cluster.local:9969",
+                kafkaBootstrapServers = "kafka:9092",
                 routing = routing {
                     defineRoute {
                         fromDomain(Domain.HVRANMEAS)