2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.tests.component
 
  22 import arrow.syntax.function.partially1
 
  23 import io.netty.buffer.ByteBuf
 
  24 import io.netty.buffer.ByteBufAllocator
 
  25 import io.netty.buffer.CompositeByteBuf
 
  26 import io.netty.buffer.Unpooled
 
  27 import io.netty.buffer.UnpooledByteBufAllocator
 
  28 import org.assertj.core.api.Assertions.assertThat
 
  29 import org.jetbrains.spek.api.Spek
 
  30 import org.jetbrains.spek.api.dsl.describe
 
  31 import org.jetbrains.spek.api.dsl.it
 
  32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 
  33 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 
  34 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 
  35 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
 
  36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 
  37 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 
  38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 
  39 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 
  40 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
 
  41 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
 
  42 import reactor.core.publisher.Flux
 
  43 import reactor.math.sum
 
  44 import java.security.MessageDigest
 
  45 import java.time.Duration
 
  46 import java.util.Random
 
  47 import kotlin.system.measureTimeMillis
 
  50  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  53 object PerformanceSpecification : Spek({
 
  56     describe("VES High Volume Collector performance") {
 
  57         it("should handle multiple clients in reasonable time") {
 
  58             val sink = CountingSink()
 
  60             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
  62             val numMessages: Long = 300_000
 
  64             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
 
  66             val params = MessageParameters(
 
  67                     commonEventHeader = commonHeader(PERF3GPP),
 
  72             val fluxes = (1.rangeTo(runs)).map {
 
  73                 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
 
  75             val durationMs = measureTimeMillis {
 
  76                 Flux.merge(fluxes).then().block(timeout)
 
  79             val durationSec = durationMs / 1000.0
 
  80             val throughput = sink.count / durationSec
 
  81             logger.info("Processed $runs connections each containing $numMessages msgs.")
 
  82             logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
 
  83             assertThat(sink.count)
 
  84                     .describedAs("should send all events")
 
  85                     .isEqualTo(runs * numMessages)
 
  88         it("should disconnect on transmission errors") {
 
  89             val sink = CountingSink()
 
  91             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
  93             val numMessages: Long = 100_000
 
  94             val timeout = Duration.ofSeconds(30)
 
  96             val params = MessageParameters(
 
  97                     commonEventHeader = commonHeader(PERF3GPP),
 
 102             val dataStream = generateDataStream(sut.alloc, params)
 
 103                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
 
 104             sut.collector.handleConnection(sut.alloc, dataStream)
 
 108             logger.info("Forwarded ${sink.count} msgs")
 
 109             assertThat(sink.count)
 
 110                     .describedAs("should send up to number of events")
 
 111                     .isLessThan(numMessages)
 
 115     describe("test infrastructure") {
 
 116         val digest = MessageDigest.getInstance("MD5")
 
 118         fun collectDigest(bb: ByteBuf) {
 
 120             while (bb.isReadable) {
 
 121                 digest.update(bb.readByte())
 
 123             bb.resetReaderIndex()
 
 126         fun calculateDigest(arrays: List<ByteArray>): ByteArray {
 
 127             for (array in arrays) {
 
 130             return digest.digest()
 
 133         it("should yield same bytes as in the input") {
 
 134             val numberOfBuffers = 10
 
 135             val singleBufferSize = 1000
 
 136             val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
 
 137             val inputDigest = calculateDigest(arrays)
 
 139             val actualTotalSize = Flux.fromIterable(arrays)
 
 140                     .map { Unpooled.wrappedBuffer(it) }
 
 141                     .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
 
 142                     .doOnNext(::collectDigest)
 
 144                         val size = it.readableBytes()
 
 152             val outputDigest = digest.digest()
 
 154             assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
 
 155             assertThat(outputDigest).isEqualTo(inputDigest)
 
 162 private const val ONE_MILION = 1_000_000.0
 
 164 private val rand = Random()
 
 165 private fun randomByteArray(size: Int): ByteArray {
 
 166     val bytes = ByteArray(size)
 
 167     rand.nextBytes(bytes)
 
 171 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
 
 173                 .filter { predicate(it.t1) }
 
 176 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
 
 177         WireFrameEncoder(alloc).let { encoder ->
 
 178             MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
 
 179                     .createMessageFlux(listOf(params))
 
 180                     .map(encoder::encode)
 
 181                     .transform { simulateRemoteTcp(alloc, 1000, it) }
 
 184 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
 
 186                 .bufferTimeout(maxSize, Duration.ofMillis(250))
 
 187                 .map { joinBuffers(alloc, it) }
 
 188                 .concatMap { randomlySplitTcpFrames(it) }
 
 190 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
 
 191         alloc.compositeBuffer().addComponents(true, it)
 
 193 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
 
 194     val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
 
 195     return Flux.create<ByteBuf> { sink ->
 
 196         while (bb.isReadable) {
 
 197             val frameSize = Math.min(targetFrameSize, bb.readableBytes())
 
 198             sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
 
 199             bb.readerIndex(bb.readerIndex() + frameSize)