Write performance tests 23/58623/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 27 Jun 2018 10:30:56 +0000 (12:30 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 08:39:41 +0000 (10:39 +0200)
Closes ONAP-434
Change-Id: I1139848f32ac19a4d0a0fd595f4b07c10cd83db0
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

20 files changed:
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt [moved from hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt with 67% similarity]
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt [moved from hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt with 83% similarity]
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
hv-collector-ct/pom.xml
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt [new file with mode: 0644]
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
hv-collector-ct/src/test/resources/logback-test.xml
req.json [new file with mode: 0644]

  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.tests.component
+package org.onap.dcae.collectors.veshv.simulators.xnf.api
 
-import org.jetbrains.spek.api.dsl.Pending
-import org.jetbrains.spek.api.dsl.TestContainer
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import reactor.core.publisher.Flux
 
-internal fun TestContainer.system(description: String, body: (Sut) -> Unit) {
-    test("system $description", body = { body(Sut()) })
-}
-
-internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) {
-    test("system $description", Pending.Yes(reason), body = { body(Sut()) })
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+interface MessageGenerator {
+    fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame>
 }
index ed96e6c..657ed31 100644 (file)
@@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-data class ClientConfiguration(
+internal data class ClientConfiguration(
         val vesHost: String,
         val vesPort: Int,
         val security: SecurityConfiguration,
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt
new file mode 100644 (file)
index 0000000..dce386b
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageGeneratorImpl
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.PayloadGenerator
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+fun createMessageGenerator(): MessageGenerator = MessageGeneratorImpl(PayloadGenerator())
index 3f872b5..c545ac8 100644 (file)
@@ -37,7 +37,7 @@ import javax.json.JsonObject
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-class HttpServer(private val vesClient: VesHvClient) {
+internal class HttpServer(private val vesClient: VesHvClient) {
 
     fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
         RatpackServer.start { server ->
@@ -69,7 +69,7 @@ class HttpServer(private val vesClient: VesHvClient) {
         return ctx.request.body
                 .map { Json.createReader(it.inputStream).readObject() }
                 .map { extractMessageParameters(it) }
-                .map { MessageFactory.INSTANCE.createMessageFlux(it) }
+                .map { MessageGeneratorImpl.INSTANCE.createMessageFlux(it) }
     }
 
     private fun sendAcceptedResponse(ctx: Context) {
@@ -95,7 +95,7 @@ class HttpServer(private val vesClient: VesHvClient) {
 
     private fun extractMessageParameters(request: JsonObject): MessageParameters =
             try {
-                val commonEventHeader = MessageFactory.INSTANCE
+                val commonEventHeader = MessageGeneratorImpl.INSTANCE
                         .parseCommonHeader(request.getJsonObject("commonEventHeader"))
                 val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
                 MessageParameters(commonEventHeader, messagesAmount)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import com.google.protobuf.ByteString
 import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
 import org.onap.ves.VesEventV5.VesEvent
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -32,9 +33,9 @@ import javax.json.JsonObject
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-class MessageFactory(private val payloadGenerator: PayloadGenerator) {
+internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
 
-    fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+    override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
             Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
                 if (messageParameters.amount < 0)
                     it.repeat()
@@ -65,16 +66,14 @@ class MessageFactory(private val payloadGenerator: PayloadGenerator) {
             WireFrame(vesMessageBytes(commonHeader))
 
 
-    private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray {
-        val msg = VesEvent.newBuilder()
-                .setCommonEventHeader(commonHeader)
-                .setHvRanMeasFields(PayloadGenerator().generatePayload().toByteString())
-                .build()
-
-        return msg.toByteArray()
-    }
+    private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
+            VesEvent.newBuilder()
+                    .setCommonEventHeader(commonHeader)
+                    .setHvRanMeasFields(payloadGenerator.generatePayload().toByteString())
+                    .build()
+                    .toByteArray()
 
     companion object {
-        val INSTANCE = MessageFactory(PayloadGenerator())
+        val INSTANCE = MessageGeneratorImpl(PayloadGenerator())
     }
 }
index 17dbbf4..c8b9763 100644 (file)
@@ -22,9 +22,9 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject
 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas
-import java.util.Random
+import java.util.*
 
-class PayloadGenerator {
+internal class PayloadGenerator {
 
     private val randomGenerator = Random()
 
index be351b5..43b73e1 100644 (file)
@@ -31,7 +31,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
-import reactor.core.publisher.EmitterProcessor
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.core.publisher.ReplayProcessor
@@ -43,7 +42,7 @@ import reactor.ipc.netty.tcp.TcpClient
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-class VesHvClient(private val configuration: ClientConfiguration) {
+internal class VesHvClient(private val configuration: ClientConfiguration) {
 
     private val client: TcpClient = TcpClient.builder()
             .options { opts ->
index f222950..dbeba2b 100644 (file)
@@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.simulators.xnf
 
 import arrow.core.Failure
 import arrow.core.Success
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient
 import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
index 2f59264..6f8a95a 100644 (file)
@@ -40,7 +40,7 @@ const val SAMPLE_LAST_EPOCH: Long = 120034455
 object MessageFactoryTest : Spek({
     describe("message factory") {
 
-        val factory = MessageFactory.INSTANCE
+        val factory = MessageGeneratorImpl.INSTANCE
 
         given("only common header") {
             it("should return infinite flux") {
index 033095a..3246cf5 100644 (file)
@@ -59,21 +59,28 @@ internal class VesHvCollector(
                         .compose(sink::send)
                         .doOnNext { metrics.notifyMessageSent(it.topic) }
                         .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+                        .onErrorResume(this::handleErrors)
                         .then()
             }
 
     private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
 
-    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
-        wireChunkDecoder.release()
-    }
-
     private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
             mapper(input).fold(
                     { Mono.empty() },
                     { Mono.just(it) })
 
+    private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
+        logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+        logger.debug("Detailed stack trace", ex)
+        return Flux.empty()
+    }
+
+    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
+        wireChunkDecoder.release()
+    }
+
     companion object {
-        val logger = Logger(VesHvCollector::class)
+        private val logger = Logger(VesHvCollector::class)
     }
 }
index 056e055..cfb61b3 100644 (file)
@@ -31,9 +31,40 @@ import reactor.core.publisher.Flux
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class WireChunkDecoder(private val decoder: WireFrameDecoder,
-                                alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+internal class WireChunkDecoder(
+        private val decoder: WireFrameDecoder,
+        alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
     private val streamBuffer = alloc.compositeBuffer()
+    
+//  TODO: use this implementation and cleanup the rest
+//    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> {
+//        if (byteBuf.readableBytes() == 0) {
+//            byteBuf.release()
+//            Flux.empty()
+//        } else {
+//            streamBuffer.addComponent(true, byteBuf)
+//            Flux.generate { next ->
+//                try {
+//                    val frame = decodeFirstFrameFromBuffer()
+//                    if (frame == null)
+//                        next.complete()
+//                    else
+//                        next.next(frame)
+//                } catch (ex: Exception) {
+//                    next.error(ex)
+//                }
+//            }
+//        }
+//    }.doOnTerminate { streamBuffer.discardReadComponents() }
+//
+//
+//    private fun decodeFirstFrameFromBuffer(): WireFrame? =
+//            try {
+//                decoder.decodeFirst(streamBuffer)
+//            } catch (ex: MissingWireFrameBytesException) {
+//                logger.trace { "${ex.message} - waiting for more data" }
+//                null
+//            }
 
     fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
             .createFlux(decoder, streamBuffer, byteBuf)
index abebff3..540c647 100644 (file)
@@ -35,24 +35,27 @@ internal class WireFrameSink(
         private val streamBuffer: ByteBuf,
         private val sink: FluxSink<WireFrame>,
         private val requestedFrameCount: Long) {
+    private var completed = false
 
     fun handleSubscriber() {
-        logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+        if (!completed) {
+            logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
 
-        try {
-            if (requestedFrameCount == Long.MAX_VALUE) {
-                logger.trace { "Push based strategy" }
-                pushAvailableFrames()
-            } else {
-                logger.trace { "Pull based strategy - req $requestedFrameCount" }
-                pushUpToNumberOfFrames()
+            try {
+                if (requestedFrameCount == Long.MAX_VALUE) {
+                    logger.trace { "Push based strategy" }
+                    pushAvailableFrames()
+                } else {
+                    logger.trace { "Pull based strategy - req $requestedFrameCount" }
+                    pushUpToNumberOfFrames()
+                }
+            } catch (ex: Exception) {
+                completed = true
+                sink.error(ex)
             }
-        } catch (ex: Exception) {
-            sink.error(ex)
-        }
-
-        logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
 
+            logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+        }
     }
 
     private fun pushAvailableFrames() {
@@ -61,6 +64,7 @@ internal class WireFrameSink(
             sink.next(nextFrame)
             nextFrame = decodeFirstFrameFromBuffer()
         }
+        completed = true
         sink.complete()
     }
 
@@ -76,6 +80,7 @@ internal class WireFrameSink(
             }
         }
         if (remaining > 0 && nextFrame == null) {
+            completed = true
             sink.complete()
         }
     }
index 1db0345..63a5c09 100644 (file)
             <artifactId>hv-collector-core</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-client-simulator</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
 
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-syntax</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
new file mode 100644 (file)
index 0000000..c68f051
--- /dev/null
@@ -0,0 +1,193 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import arrow.syntax.function.partially1
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.CompositeByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator
+import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import reactor.core.publisher.Flux
+import reactor.math.sum
+import java.security.MessageDigest
+import java.time.Duration
+import java.util.*
+import kotlin.system.measureTimeMillis
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object PerformanceSpecification : Spek({
+    describe("VES High Volume Collector performance") {
+        it("should handle multiple clients in reasonable time") {
+            val sink = CountingSink()
+            val sut = Sut(sink)
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val numMessages: Long = 300_000
+            val runs = 4
+            val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
+
+            val params = MessageParameters(
+                    commonEventHeader = vesEvent().commonEventHeader,
+                    amount = numMessages)
+            val fluxes = (1.rangeTo(runs)).map {
+                sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+            }
+            val durationMs = measureTimeMillis {
+                Flux.merge(fluxes).then().block(timeout)
+            }
+
+            val durationSec = durationMs / 1000.0
+            val throughput = sink.count / durationSec
+            println("Processed $runs connections each containing $numMessages msgs.")
+            println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+            assertThat(sink.count)
+                    .describedAs("should send all events")
+                    .isEqualTo(runs * numMessages)
+        }
+
+        it("should disconnect on transmission errors") {
+            val sink = CountingSink()
+            val sut = Sut(sink)
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val numMessages: Long = 100_000
+            val timeout = Duration.ofSeconds(30)
+
+            val params = MessageParameters(
+                    commonEventHeader = vesEvent().commonEventHeader,
+                    amount = numMessages)
+
+            val dataStream = generateDataStream(sut.alloc, params)
+                    .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
+            sut.collector.handleConnection(sut.alloc, dataStream)
+                    .timeout(timeout)
+                    .block()
+
+            println("Forwarded ${sink.count} msgs")
+            assertThat(sink.count)
+                    .describedAs("should send up to number of events")
+                    .isLessThan(numMessages)
+        }
+    }
+
+    describe("test infrastructure") {
+        val digest = MessageDigest.getInstance("MD5")
+
+        fun collectDigest(bb: ByteBuf) {
+            bb.markReaderIndex()
+            while (bb.isReadable) {
+                digest.update(bb.readByte())
+            }
+            bb.resetReaderIndex()
+        }
+
+        fun calculateDigest(arrays: List<ByteArray>): ByteArray {
+            for (array in arrays) {
+                digest.update(array)
+            }
+            return digest.digest()
+        }
+
+        it("should yield same bytes as in the input") {
+            val numberOfBuffers = 10
+            val singleBufferSize = 1000
+            val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
+            val inputDigest = calculateDigest(arrays)
+
+            val actualTotalSize = Flux.fromIterable(arrays)
+                    .map { Unpooled.wrappedBuffer(it) }
+                    .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
+                    .doOnNext(::collectDigest)
+                    .map {
+                        val size = it.readableBytes()
+                        it.release()
+                        size
+                    }
+                    .sum()
+                    .map(Long::toInt)
+                    .block()
+
+            val outputDigest = digest.digest()
+
+            assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
+            assertThat(outputDigest).isEqualTo(inputDigest)
+
+        }
+    }
+})
+
+
+private const val ONE_MILION = 1_000_000.0
+
+private val rand = Random()
+private fun randomByteArray(size: Int): ByteArray {
+    val bytes = ByteArray(size)
+    rand.nextBytes(bytes)
+    return bytes
+}
+
+fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
+        stream.index()
+                .filter { predicate(it.t1) }
+                .map { it.t2 }
+
+private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
+        WireFrameEncoder(alloc).let { encoder ->
+            createMessageGenerator()
+                    .createMessageFlux(params)
+                    .map(encoder::encode)
+                    .transform { simulateRemoteTcp(alloc, 1000, it) }
+        }
+
+private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
+        byteBuffers
+                .bufferTimeout(maxSize, Duration.ofMillis(250))
+                .map { joinBuffers(alloc, it) }
+                .concatMap { randomlySplitTcpFrames(it) }
+
+private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
+        alloc.compositeBuffer().addComponents(true, it)
+
+private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
+    val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
+    return Flux.create<ByteBuf> { sink ->
+        while (bb.isReadable) {
+            val frameSize = Math.min(targetFrameSize, bb.readableBytes())
+            sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
+            bb.readerIndex(bb.readerIndex() + frameSize)
+        }
+        bb.release()
+        sink.complete()
+    }
+}
index 5099ae4..44b3266 100644 (file)
@@ -22,14 +22,14 @@ package org.onap.dcae.collectors.veshv.tests.component
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.UnpooledByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.Exceptions
 import reactor.core.publisher.Flux
 import java.time.Duration
 
@@ -37,9 +37,9 @@ import java.time.Duration
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class Sut {
+class Sut(sink: Sink = StoringSink()) {
     val configurationProvider = FakeConfigurationProvider()
-    val sink = FakeSink()
+
     val alloc = UnpooledByteBufAllocator.DEFAULT
     val metrics = FakeMetrics()
     private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
@@ -47,21 +47,9 @@ internal class Sut {
 
     val collector: Collector
         get() = collectorProvider()
+}
 
-    fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> {
-        collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
-        return sink.sentMessages
-    }
-
-    fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> =
-        try {
-            collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
-            Pair(sink.sentMessages, null)
-        } catch (ex: Exception) {
-            Pair(sink.sentMessages, ex)
-        }
-
-    companion object {
-        val logger = Logger(Sut::class)
-    }
+fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
+    collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+    return sink.sentMessages
 }
index 08b6382..0845059 100644 (file)
@@ -23,8 +23,10 @@ import com.google.protobuf.InvalidProtocolBufferException
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
 import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 
@@ -34,9 +36,11 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
  */
 object VesHvSpecification : Spek({
     describe("VES High Volume Collector") {
-        system("should handle multiple HV RAN events") { sut ->
+        it("should handle multiple HV RAN events") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
             sut.configurationProvider.updateConfiguration(basicConfiguration)
-            val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
+            val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
 
             assertThat(messages)
                     .describedAs("should send all events")
@@ -46,18 +50,18 @@ object VesHvSpecification : Spek({
 
     describe("Memory management") {
 
-        system("should release memory for each handled and dropped message") { sut ->
+        it("should release memory for each handled and dropped message") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
             sut.configurationProvider.updateConfiguration(basicConfiguration)
             val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
             val msgWithInvalidFrame = invalidWireFrame()
             val expectedRefCnt = 0
 
-            val (handledEvents, exception) = sut.handleConnectionReturningError(
-                    validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
+            val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
 
             assertThat(handledEvents).hasSize(1)
-            assertThat(exception).isNull()
 
             assertThat(validMessage.refCnt())
                     .describedAs("handled message should be released")
@@ -71,17 +75,17 @@ object VesHvSpecification : Spek({
 
         }
 
-        system("should release memory for each message with invalid payload") { sut ->
+        it("should release memory for each message with invalid payload") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
             sut.configurationProvider.updateConfiguration(basicConfiguration)
             val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithInvalidPayload = invalidVesMessage()
             val expectedRefCnt = 0
 
-            val (handledEvents, exception) = sut.handleConnectionReturningError(
-                    validMessage, msgWithInvalidPayload)
+            val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
 
             assertThat(handledEvents).hasSize(1)
-            assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java)
 
             assertThat(validMessage.refCnt())
                     .describedAs("handled message should be released")
@@ -92,18 +96,17 @@ object VesHvSpecification : Spek({
 
         }
 
-        system("should release memory for each message with garbage frame") { sut ->
+        it("should release memory for each message with garbage frame") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
             sut.configurationProvider.updateConfiguration(basicConfiguration)
             val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithGarbageFrame = garbageFrame()
             val expectedRefCnt = 0
 
-            val (handledEvents, exception) = sut.handleConnectionReturningError(
-                    validMessage, msgWithGarbageFrame)
+            val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
 
             assertThat(handledEvents).hasSize(1)
-            assertThat(exception?.cause)
-                    .isInstanceOf(InvalidWireFrameMarkerException::class.java)
 
             assertThat(validMessage.refCnt())
                     .describedAs("handled message should be released")
@@ -116,10 +119,12 @@ object VesHvSpecification : Spek({
     }
 
     describe("message routing") {
-        system("should direct message to a topic by means of routing configuration") { sut ->
+        it("should direct message to a topic by means of routing configuration") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
-            val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS))
+            val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
@@ -127,9 +132,11 @@ object VesHvSpecification : Spek({
             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
         }
 
-        system("should drop message if route was not found") { sut ->
+        it("should drop message if route was not found") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
             sut.configurationProvider.updateConfiguration(basicConfiguration)
-            val messages = sut.handleConnection(
+            val messages = sut.handleConnection(sink,
                     vesMessage(Domain.OTHER, "first"),
                     vesMessage(Domain.HVRANMEAS, "second"),
                     vesMessage(Domain.HEARTBEAT, "third"))
index 3314c44..8895d64 100644 (file)
@@ -25,7 +25,7 @@ import io.netty.buffer.PooledByteBufAllocator
 import org.onap.ves.VesEventV5.VesEvent
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import java.util.UUID
+import java.util.*
 
 val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
 
@@ -61,7 +61,7 @@ fun invalidWireFrame() = allocator.buffer().run {
     writeByte(0x01)   // content type = GPB
 }
 
-fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) =
+fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) =
         VesEvent.newBuilder()
                 .setCommonEventHeader(
                         CommonEventHeader.getDefaultInstance().toBuilder()
index b0dbd0f..a5fd546 100644 (file)
@@ -21,16 +21,16 @@ package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
 import reactor.core.publisher.Flux
 import java.util.*
 import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class FakeSink : Sink {
+class StoringSink : Sink {
     private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
 
     val sentMessages: List<RoutedMessage>
@@ -40,3 +40,20 @@ class FakeSink : Sink {
         return messages.doOnNext(sent::addLast)
     }
 }
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class CountingSink : Sink {
+    private val atomicCount = AtomicLong(0)
+
+    val count: Long
+        get() = atomicCount.get()
+
+    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+        return messages.doOnNext {
+            atomicCount.incrementAndGet()
+        }
+    }
+}
index 84abc9d..93f2277 100644 (file)
       </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
 
   <root level="INFO">
       <appender-ref ref="CONSOLE"/>
       <appender-ref ref="ROLLING-FILE"/>
     </root>
-</configuration>
\ No newline at end of file
+</configuration>
diff --git a/req.json b/req.json
new file mode 100644 (file)
index 0000000..e092ed6
--- /dev/null
+++ b/req.json
@@ -0,0 +1,20 @@
+{ 
+      "commonEventHeader": {
+                "version": "sample-version",
+                "domain": 10,
+                "sequence": 1,
+                "priority": 1,
+                "eventId": "sample-event-id",
+                "eventName": "sample-event-name",
+                "eventType": "sample-event-type",
+                "startEpochMicrosec": 120034455,
+                "lastEpochMicrosec": 120034455,
+                "nfNamingCode": "sample-nf-naming-code",
+                "nfcNamingCode": "sample-nfc-naming-code",
+                "reportingEntityId": "sample-reporting-entity-id",
+                "reportingEntityName": "sample-reporting-entity-name",
+                "sourceId": "sample-source-id",
+                "sourceName": "sample-source-name"
+        },
+        "messagesAmount": 1000000
+}