2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
33 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
34 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
35 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
36 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
37 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
38 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
39 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
40 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
41 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
42 import reactor.core.publisher.Flux
43 import reactor.math.sum
44 import java.security.MessageDigest
45 import java.time.Duration
47 import kotlin.system.measureTimeMillis
50 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
53 object PerformanceSpecification : Spek({
56 describe("VES High Volume Collector performance") {
57 it("should handle multiple clients in reasonable time") {
58 val sink = CountingSink()
60 sut.configurationProvider.updateConfiguration(basicRouting)
62 val numMessages: Long = 300_000
64 val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
66 val params = VesEventParameters(
67 commonEventHeader = commonHeader(PERF3GPP),
68 messageType = VesEventType.VALID,
72 val fluxes = (1.rangeTo(runs)).map {
73 sut.collector.handleConnection(generateDataStream(sut.alloc, params))
75 val durationMs = measureTimeMillis {
76 Flux.merge(fluxes).then().block(timeout)
79 val durationSec = durationMs / 1000.0
80 val throughput = sink.count / durationSec
81 logger.info { "Processed $runs connections each containing $numMessages msgs." }
82 logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
83 assertThat(sink.count)
84 .describedAs("should send all events")
85 .isEqualTo(runs * numMessages)
88 it("should disconnect on transmission errors") {
89 val sink = CountingSink()
91 sut.configurationProvider.updateConfiguration(basicRouting)
93 val numMessages: Long = 100_000
94 val timeout = Duration.ofSeconds(30)
96 val params = VesEventParameters(
97 commonEventHeader = commonHeader(PERF3GPP),
98 messageType = VesEventType.VALID,
102 val dataStream = generateDataStream(sut.alloc, params)
103 .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
104 sut.collector.handleConnection(dataStream)
108 logger.info { "Forwarded ${sink.count} msgs" }
109 assertThat(sink.count)
110 .describedAs("should send up to number of events")
111 .isLessThan(numMessages)
115 describe("test infrastructure") {
116 val digest = MessageDigest.getInstance("MD5")
118 fun collectDigest(bb: ByteBuf) {
120 while (bb.isReadable) {
121 digest.update(bb.readByte())
123 bb.resetReaderIndex()
126 fun calculateDigest(arrays: List<ByteArray>): ByteArray {
127 for (array in arrays) {
130 return digest.digest()
133 it("should yield same bytes as in the input") {
134 val numberOfBuffers = 10
135 val singleBufferSize = 1000
136 val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
137 val inputDigest = calculateDigest(arrays)
139 val actualTotalSize = Flux.fromIterable(arrays)
140 .map { Unpooled.wrappedBuffer(it) }
141 .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
142 .doOnNext(::collectDigest)
144 val size = it.readableBytes()
152 val outputDigest = digest.digest()
154 assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
155 assertThat(outputDigest).isEqualTo(inputDigest)
162 private const val ONE_MILION = 1_000_000.0
163 private val rand = Random()
164 private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
166 private fun randomByteArray(size: Int): ByteArray {
167 val bytes = ByteArray(size)
168 rand.nextBytes(bytes)
172 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
174 .filter { predicate(it.t1) }
177 private fun generateDataStream(alloc: ByteBufAllocator, params: VesEventParameters): Flux<ByteBuf> =
178 WireFrameEncoder(alloc).let { encoder ->
179 generatorsFactory.createVesEventGenerator()
180 .createMessageFlux(params)
181 .map { WireFrameMessage(it.toByteArray()) }
182 .map(encoder::encode)
183 .transform { simulateRemoteTcp(alloc, 1000, it) }
186 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
188 .bufferTimeout(maxSize, Duration.ofMillis(250))
189 .map { joinBuffers(alloc, it) }
190 .concatMap { randomlySplitTcpFrames(it) }
192 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
193 alloc.compositeBuffer().addComponents(true, it)
195 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
196 val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
197 return Flux.create<ByteBuf> { sink ->
198 while (bb.isReadable) {
199 val frameSize = Math.min(targetFrameSize, bb.readableBytes())
200 sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
201 bb.readerIndex(bb.readerIndex() + frameSize)