2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.*
24 import org.assertj.core.api.Assertions.assertThat
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.describe
27 import org.jetbrains.spek.api.dsl.it
28 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
29 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
30 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
31 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
32 import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
33 import reactor.core.publisher.Flux
34 import reactor.math.sum
35 import java.security.MessageDigest
36 import java.time.Duration
38 import kotlin.system.measureTimeMillis
41 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
44 object PerformanceSpecification : Spek({
47 describe("VES High Volume Collector performance") {
48 it("should handle multiple clients in reasonable time") {
49 val sink = CountingSink()
51 sut.configurationProvider.updateConfiguration(basicConfiguration)
53 val numMessages: Long = 300_000
55 val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
57 val params = MessageParameters(
58 commonEventHeader = vesEvent().commonEventHeader,
60 val fluxes = (1.rangeTo(runs)).map {
61 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
63 val durationMs = measureTimeMillis {
64 Flux.merge(fluxes).then().block(timeout)
67 val durationSec = durationMs / 1000.0
68 val throughput = sink.count / durationSec
69 logger.info("Processed $runs connections each containing $numMessages msgs.")
70 logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
71 assertThat(sink.count)
72 .describedAs("should send all events")
73 .isEqualTo(runs * numMessages)
76 it("should disconnect on transmission errors") {
77 val sink = CountingSink()
79 sut.configurationProvider.updateConfiguration(basicConfiguration)
81 val numMessages: Long = 100_000
82 val timeout = Duration.ofSeconds(30)
84 val params = MessageParameters(
85 commonEventHeader = vesEvent().commonEventHeader,
88 val dataStream = generateDataStream(sut.alloc, params)
89 .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
90 sut.collector.handleConnection(sut.alloc, dataStream)
94 logger.info("Forwarded ${sink.count} msgs")
95 assertThat(sink.count)
96 .describedAs("should send up to number of events")
97 .isLessThan(numMessages)
101 describe("test infrastructure") {
102 val digest = MessageDigest.getInstance("MD5")
104 fun collectDigest(bb: ByteBuf) {
106 while (bb.isReadable) {
107 digest.update(bb.readByte())
109 bb.resetReaderIndex()
112 fun calculateDigest(arrays: List<ByteArray>): ByteArray {
113 for (array in arrays) {
116 return digest.digest()
119 it("should yield same bytes as in the input") {
120 val numberOfBuffers = 10
121 val singleBufferSize = 1000
122 val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
123 val inputDigest = calculateDigest(arrays)
125 val actualTotalSize = Flux.fromIterable(arrays)
126 .map { Unpooled.wrappedBuffer(it) }
127 .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
128 .doOnNext(::collectDigest)
130 val size = it.readableBytes()
138 val outputDigest = digest.digest()
140 assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
141 assertThat(outputDigest).isEqualTo(inputDigest)
148 private const val ONE_MILION = 1_000_000.0
150 private val rand = Random()
151 private fun randomByteArray(size: Int): ByteArray {
152 val bytes = ByteArray(size)
153 rand.nextBytes(bytes)
157 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
159 .filter { predicate(it.t1) }
162 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
163 WireFrameEncoder(alloc).let { encoder ->
164 MessageGenerator.INSTANCE
165 .createMessageFlux(params)
166 .map(encoder::encode)
167 .transform { simulateRemoteTcp(alloc, 1000, it) }
170 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
172 .bufferTimeout(maxSize, Duration.ofMillis(250))
173 .map { joinBuffers(alloc, it) }
174 .concatMap { randomlySplitTcpFrames(it) }
176 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
177 alloc.compositeBuffer().addComponents(true, it)
179 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
180 val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
181 return Flux.create<ByteBuf> { sink ->
182 while (bb.isReadable) {
183 val frameSize = Math.min(targetFrameSize, bb.readableBytes())
184 sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
185 bb.readerIndex(bb.readerIndex() + frameSize)