2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
34 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
35 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
36 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
37 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
38 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
39 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
40 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
41 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
42 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
43 import reactor.core.publisher.Flux
44 import reactor.math.sum
45 import java.security.MessageDigest
46 import java.time.Duration
48 import kotlin.system.measureTimeMillis
51 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
54 object PerformanceSpecification : Spek({
57 describe("VES High Volume Collector performance") {
58 it("should handle multiple clients in reasonable time") {
59 val sink = CountingSink()
60 val sut = Sut(CollectorConfiguration(basicRouting), sink)
62 val numMessages: Long = 300_000
64 val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
66 val params = VesEventParameters(
67 commonEventHeader = commonHeader(PERF3GPP),
68 messageType = VesEventType.VALID,
72 val fluxes = (1.rangeTo(runs)).map {
73 sut.collector.handleConnection(generateDataStream(sut.alloc, params))
75 val durationMs = measureTimeMillis {
76 Flux.merge(fluxes).then().block(timeout)
79 val durationSec = durationMs / 1000.0
80 val throughput = sink.count / durationSec
81 logger.info { "Processed $runs connections each containing $numMessages msgs." }
82 logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
83 assertThat(sink.count)
84 .describedAs("should send all events")
85 .isEqualTo(runs * numMessages)
88 it("should disconnect on transmission errors") {
89 val sink = CountingSink()
90 val sut = Sut(CollectorConfiguration(basicRouting), sink)
92 val numMessages: Long = 100_000
93 val timeout = Duration.ofSeconds(30)
95 val params = VesEventParameters(
96 commonEventHeader = commonHeader(PERF3GPP),
97 messageType = VesEventType.VALID,
101 val dataStream = generateDataStream(sut.alloc, params)
102 .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
103 sut.collector.handleConnection(dataStream)
107 logger.info { "Forwarded ${sink.count} msgs" }
108 assertThat(sink.count)
109 .describedAs("should send up to number of events")
110 .isLessThan(numMessages)
114 describe("test infrastructure") {
115 val digest = MessageDigest.getInstance("MD5")
117 fun collectDigest(bb: ByteBuf) {
119 while (bb.isReadable) {
120 digest.update(bb.readByte())
122 bb.resetReaderIndex()
125 fun calculateDigest(arrays: List<ByteArray>): ByteArray {
126 for (array in arrays) {
129 return digest.digest()
132 it("should yield same bytes as in the input") {
133 val numberOfBuffers = 10
134 val singleBufferSize = 1000
135 val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
136 val inputDigest = calculateDigest(arrays)
138 val actualTotalSize = Flux.fromIterable(arrays)
139 .map { Unpooled.wrappedBuffer(it) }
140 .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
141 .doOnNext(::collectDigest)
143 val size = it.readableBytes()
151 val outputDigest = digest.digest()
153 assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
154 assertThat(outputDigest).isEqualTo(inputDigest)
161 private const val ONE_MILLION = 1_000_000.0
162 private val rand = Random()
163 private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
165 private fun randomByteArray(size: Int): ByteArray {
166 val bytes = ByteArray(size)
167 rand.nextBytes(bytes)
171 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
173 .filter { predicate(it.t1) }
176 private fun generateDataStream(alloc: ByteBufAllocator, params: VesEventParameters): Flux<ByteBuf> =
177 WireFrameEncoder(alloc).let { encoder ->
178 generatorsFactory.createVesEventGenerator()
179 .createMessageFlux(params)
180 .map { WireFrameMessage(it.toByteArray()) }
181 .map(encoder::encode)
182 .transform { simulateRemoteTcp(alloc, 1000, it) }
185 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
187 .bufferTimeout(maxSize, Duration.ofMillis(250))
188 .map { joinBuffers(alloc, it) }
189 .concatMap { randomlySplitTcpFrames(it) }
191 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
192 alloc.compositeBuffer().addComponents(true, it)
194 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
195 val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
196 return Flux.create<ByteBuf> { sink ->
197 while (bb.isReadable) {
198 val frameSize = Math.min(targetFrameSize, bb.readableBytes())
199 sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
200 bb.readerIndex(bb.readerIndex() + frameSize)