2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
33 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
34 import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator
35 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import reactor.core.publisher.Flux
38 import reactor.math.sum
39 import java.security.MessageDigest
40 import java.time.Duration
42 import kotlin.system.measureTimeMillis
45 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
48 object PerformanceSpecification : Spek({
51 describe("VES High Volume Collector performance") {
52 it("should handle multiple clients in reasonable time") {
53 val sink = CountingSink()
55 sut.configurationProvider.updateConfiguration(basicConfiguration)
57 val numMessages: Long = 300_000
59 val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
61 val params = MessageParameters(
62 commonEventHeader = vesEvent().commonEventHeader,
64 val fluxes = (1.rangeTo(runs)).map {
65 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
67 val durationMs = measureTimeMillis {
68 Flux.merge(fluxes).then().block(timeout)
71 val durationSec = durationMs / 1000.0
72 val throughput = sink.count / durationSec
73 logger.info("Processed $runs connections each containing $numMessages msgs.")
74 logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
75 assertThat(sink.count)
76 .describedAs("should send all events")
77 .isEqualTo(runs * numMessages)
80 it("should disconnect on transmission errors") {
81 val sink = CountingSink()
83 sut.configurationProvider.updateConfiguration(basicConfiguration)
85 val numMessages: Long = 100_000
86 val timeout = Duration.ofSeconds(30)
88 val params = MessageParameters(
89 commonEventHeader = vesEvent().commonEventHeader,
92 val dataStream = generateDataStream(sut.alloc, params)
93 .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
94 sut.collector.handleConnection(sut.alloc, dataStream)
98 logger.info("Forwarded ${sink.count} msgs")
99 assertThat(sink.count)
100 .describedAs("should send up to number of events")
101 .isLessThan(numMessages)
105 describe("test infrastructure") {
106 val digest = MessageDigest.getInstance("MD5")
108 fun collectDigest(bb: ByteBuf) {
110 while (bb.isReadable) {
111 digest.update(bb.readByte())
113 bb.resetReaderIndex()
116 fun calculateDigest(arrays: List<ByteArray>): ByteArray {
117 for (array in arrays) {
120 return digest.digest()
123 it("should yield same bytes as in the input") {
124 val numberOfBuffers = 10
125 val singleBufferSize = 1000
126 val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
127 val inputDigest = calculateDigest(arrays)
129 val actualTotalSize = Flux.fromIterable(arrays)
130 .map { Unpooled.wrappedBuffer(it) }
131 .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
132 .doOnNext(::collectDigest)
134 val size = it.readableBytes()
142 val outputDigest = digest.digest()
144 assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
145 assertThat(outputDigest).isEqualTo(inputDigest)
152 private const val ONE_MILION = 1_000_000.0
154 private val rand = Random()
155 private fun randomByteArray(size: Int): ByteArray {
156 val bytes = ByteArray(size)
157 rand.nextBytes(bytes)
161 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
163 .filter { predicate(it.t1) }
166 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
167 WireFrameEncoder(alloc).let { encoder ->
168 createMessageGenerator()
169 .createMessageFlux(params)
170 .map(encoder::encode)
171 .transform { simulateRemoteTcp(alloc, 1000, it) }
174 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
176 .bufferTimeout(maxSize, Duration.ofMillis(250))
177 .map { joinBuffers(alloc, it) }
178 .concatMap { randomlySplitTcpFrames(it) }
180 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
181 alloc.compositeBuffer().addComponents(true, it)
183 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
184 val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
185 return Flux.create<ByteBuf> { sink ->
186 while (bb.isReadable) {
187 val frameSize = Math.min(targetFrameSize, bb.readableBytes())
188 sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
189 bb.readerIndex(bb.readerIndex() + frameSize)