2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
33 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
34 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
35 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
39 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
40 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
41 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
42 import reactor.core.publisher.Flux
43 import reactor.math.sum
44 import java.security.MessageDigest
45 import java.time.Duration
46 import java.util.Random
47 import kotlin.system.measureTimeMillis
50 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
53 object PerformanceSpecification : Spek({
56 describe("VES High Volume Collector performance") {
57 it("should handle multiple clients in reasonable time") {
58 val sink = CountingSink()
60 sut.configurationProvider.updateConfiguration(basicConfiguration)
62 val numMessages: Long = 300_000
64 val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
66 val params = MessageParameters(
67 commonEventHeader = commonHeader(PERF3GPP),
72 val fluxes = (1.rangeTo(runs)).map {
73 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
75 val durationMs = measureTimeMillis {
76 Flux.merge(fluxes).then().block(timeout)
79 val durationSec = durationMs / 1000.0
80 val throughput = sink.count / durationSec
81 logger.info("Processed $runs connections each containing $numMessages msgs.")
82 logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
83 assertThat(sink.count)
84 .describedAs("should send all events")
85 .isEqualTo(runs * numMessages)
88 it("should disconnect on transmission errors") {
89 val sink = CountingSink()
91 sut.configurationProvider.updateConfiguration(basicConfiguration)
93 val numMessages: Long = 100_000
94 val timeout = Duration.ofSeconds(30)
96 val params = MessageParameters(
97 commonEventHeader = commonHeader(PERF3GPP),
102 val dataStream = generateDataStream(sut.alloc, params)
103 .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
104 sut.collector.handleConnection(sut.alloc, dataStream)
108 logger.info("Forwarded ${sink.count} msgs")
109 assertThat(sink.count)
110 .describedAs("should send up to number of events")
111 .isLessThan(numMessages)
115 describe("test infrastructure") {
116 val digest = MessageDigest.getInstance("MD5")
118 fun collectDigest(bb: ByteBuf) {
120 while (bb.isReadable) {
121 digest.update(bb.readByte())
123 bb.resetReaderIndex()
126 fun calculateDigest(arrays: List<ByteArray>): ByteArray {
127 for (array in arrays) {
130 return digest.digest()
133 it("should yield same bytes as in the input") {
134 val numberOfBuffers = 10
135 val singleBufferSize = 1000
136 val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
137 val inputDigest = calculateDigest(arrays)
139 val actualTotalSize = Flux.fromIterable(arrays)
140 .map { Unpooled.wrappedBuffer(it) }
141 .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
142 .doOnNext(::collectDigest)
144 val size = it.readableBytes()
152 val outputDigest = digest.digest()
154 assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
155 assertThat(outputDigest).isEqualTo(inputDigest)
162 private const val ONE_MILION = 1_000_000.0
164 private val rand = Random()
165 private fun randomByteArray(size: Int): ByteArray {
166 val bytes = ByteArray(size)
167 rand.nextBytes(bytes)
171 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
173 .filter { predicate(it.t1) }
176 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
177 WireFrameEncoder(alloc).let { encoder ->
178 MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
179 .createMessageFlux(listOf(params))
180 .map(encoder::encode)
181 .transform { simulateRemoteTcp(alloc, 1000, it) }
184 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
186 .bufferTimeout(maxSize, Duration.ofMillis(250))
187 .map { joinBuffers(alloc, it) }
188 .concatMap { randomlySplitTcpFrames(it) }
190 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
191 alloc.compositeBuffer().addComponents(true, it)
193 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
194 val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
195 return Flux.create<ByteBuf> { sink ->
196 while (bb.isReadable) {
197 val frameSize = Math.min(targetFrameSize, bb.readableBytes())
198 sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
199 bb.readerIndex(bb.readerIndex() + frameSize)