2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
33 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
34 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
35 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
36 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
39 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
40 import reactor.core.publisher.Flux
41 import reactor.math.sum
42 import java.security.MessageDigest
43 import java.time.Duration
45 import kotlin.system.measureTimeMillis
48 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
51 object PerformanceSpecification : Spek({
54 describe("VES High Volume Collector performance") {
55 it("should handle multiple clients in reasonable time") {
56 val sink = CountingSink()
58 sut.configurationProvider.updateConfiguration(basicConfiguration)
60 val numMessages: Long = 300_000
62 val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
64 val params = MessageParameters(
65 commonEventHeader = commonHeader(PERF3GPP),
70 val fluxes = (1.rangeTo(runs)).map {
71 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
73 val durationMs = measureTimeMillis {
74 Flux.merge(fluxes).then().block(timeout)
77 val durationSec = durationMs / 1000.0
78 val throughput = sink.count / durationSec
79 logger.info("Processed $runs connections each containing $numMessages msgs.")
80 logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
81 assertThat(sink.count)
82 .describedAs("should send all events")
83 .isEqualTo(runs * numMessages)
86 it("should disconnect on transmission errors") {
87 val sink = CountingSink()
89 sut.configurationProvider.updateConfiguration(basicConfiguration)
91 val numMessages: Long = 100_000
92 val timeout = Duration.ofSeconds(30)
94 val params = MessageParameters(
95 commonEventHeader = commonHeader(PERF3GPP),
100 val dataStream = generateDataStream(sut.alloc, params)
101 .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
102 sut.collector.handleConnection(sut.alloc, dataStream)
106 logger.info("Forwarded ${sink.count} msgs")
107 assertThat(sink.count)
108 .describedAs("should send up to number of events")
109 .isLessThan(numMessages)
113 describe("test infrastructure") {
114 val digest = MessageDigest.getInstance("MD5")
116 fun collectDigest(bb: ByteBuf) {
118 while (bb.isReadable) {
119 digest.update(bb.readByte())
121 bb.resetReaderIndex()
124 fun calculateDigest(arrays: List<ByteArray>): ByteArray {
125 for (array in arrays) {
128 return digest.digest()
131 it("should yield same bytes as in the input") {
132 val numberOfBuffers = 10
133 val singleBufferSize = 1000
134 val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
135 val inputDigest = calculateDigest(arrays)
137 val actualTotalSize = Flux.fromIterable(arrays)
138 .map { Unpooled.wrappedBuffer(it) }
139 .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
140 .doOnNext(::collectDigest)
142 val size = it.readableBytes()
150 val outputDigest = digest.digest()
152 assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
153 assertThat(outputDigest).isEqualTo(inputDigest)
160 private const val ONE_MILION = 1_000_000.0
162 private val rand = Random()
163 private fun randomByteArray(size: Int): ByteArray {
164 val bytes = ByteArray(size)
165 rand.nextBytes(bytes)
169 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
171 .filter { predicate(it.t1) }
174 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
175 WireFrameEncoder(alloc).let { encoder ->
176 MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
177 .createMessageFlux(listOf(params))
178 .map(encoder::encode)
179 .transform { simulateRemoteTcp(alloc, 1000, it) }
182 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
184 .bufferTimeout(maxSize, Duration.ofMillis(250))
185 .map { joinBuffers(alloc, it) }
186 .concatMap { randomlySplitTcpFrames(it) }
188 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
189 alloc.compositeBuffer().addComponents(true, it)
191 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
192 val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
193 return Flux.create<ByteBuf> { sink ->
194 while (bb.isReadable) {
195 val frameSize = Math.min(targetFrameSize, bb.readableBytes())
196 sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
197 bb.readerIndex(bb.readerIndex() + frameSize)