2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
33 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
34 import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator
35 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import reactor.core.publisher.Flux
38 import reactor.math.sum
39 import java.security.MessageDigest
40 import java.time.Duration
42 import kotlin.system.measureTimeMillis
46 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
49 object PerformanceSpecification : Spek({
50 describe("VES High Volume Collector performance") {
51 it("should handle multiple clients in reasonable time") {
52 val sink = CountingSink()
54 sut.configurationProvider.updateConfiguration(basicConfiguration)
56 val numMessages: Long = 300_000
58 val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
60 val params = MessageParameters(
61 commonEventHeader = vesEvent().commonEventHeader,
63 val fluxes = (1.rangeTo(runs)).map {
64 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
66 val durationMs = measureTimeMillis {
67 Flux.merge(fluxes).then().block(timeout)
70 val durationSec = durationMs / 1000.0
71 val throughput = sink.count / durationSec
72 println("Processed $runs connections each containing $numMessages msgs.")
73 println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
74 assertThat(sink.count)
75 .describedAs("should send all events")
76 .isEqualTo(runs * numMessages)
79 it("should disconnect on transmission errors") {
80 val sink = CountingSink()
82 sut.configurationProvider.updateConfiguration(basicConfiguration)
84 val numMessages: Long = 100_000
85 val timeout = Duration.ofSeconds(30)
87 val params = MessageParameters(
88 commonEventHeader = vesEvent().commonEventHeader,
91 val dataStream = generateDataStream(sut.alloc, params)
92 .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
93 sut.collector.handleConnection(sut.alloc, dataStream)
97 println("Forwarded ${sink.count} msgs")
98 assertThat(sink.count)
99 .describedAs("should send up to number of events")
100 .isLessThan(numMessages)
104 describe("test infrastructure") {
105 val digest = MessageDigest.getInstance("MD5")
107 fun collectDigest(bb: ByteBuf) {
109 while (bb.isReadable) {
110 digest.update(bb.readByte())
112 bb.resetReaderIndex()
115 fun calculateDigest(arrays: List<ByteArray>): ByteArray {
116 for (array in arrays) {
119 return digest.digest()
122 it("should yield same bytes as in the input") {
123 val numberOfBuffers = 10
124 val singleBufferSize = 1000
125 val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
126 val inputDigest = calculateDigest(arrays)
128 val actualTotalSize = Flux.fromIterable(arrays)
129 .map { Unpooled.wrappedBuffer(it) }
130 .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
131 .doOnNext(::collectDigest)
133 val size = it.readableBytes()
141 val outputDigest = digest.digest()
143 assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
144 assertThat(outputDigest).isEqualTo(inputDigest)
151 private const val ONE_MILION = 1_000_000.0
153 private val rand = Random()
154 private fun randomByteArray(size: Int): ByteArray {
155 val bytes = ByteArray(size)
156 rand.nextBytes(bytes)
160 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
162 .filter { predicate(it.t1) }
165 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
166 WireFrameEncoder(alloc).let { encoder ->
167 createMessageGenerator()
168 .createMessageFlux(params)
169 .map(encoder::encode)
170 .transform { simulateRemoteTcp(alloc, 1000, it) }
173 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
175 .bufferTimeout(maxSize, Duration.ofMillis(250))
176 .map { joinBuffers(alloc, it) }
177 .concatMap { randomlySplitTcpFrames(it) }
179 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
180 alloc.compositeBuffer().addComponents(true, it)
182 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
183 val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
184 return Flux.create<ByteBuf> { sink ->
185 while (bb.isReadable) {
186 val frameSize = Math.min(targetFrameSize, bb.readableBytes())
187 sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
188 bb.readerIndex(bb.readerIndex() + frameSize)