b793f3aa02eb1906a129dc465cdcc583a6bb9385
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.*
24 import org.assertj.core.api.Assertions.assertThat
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.describe
27 import org.jetbrains.spek.api.dsl.it
28 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
29 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
30 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
31 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
32 import org.onap.dcae.collectors.veshv.ves.message.generator.config.MessageParameters
33 import reactor.core.publisher.Flux
34 import reactor.math.sum
35 import java.security.MessageDigest
36 import java.time.Duration
37 import java.util.*
38 import kotlin.system.measureTimeMillis
39
40 /**
41  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
42  * @since May 2018
43  */
44 object PerformanceSpecification : Spek({
45     debugRx(false)
46
47     describe("VES High Volume Collector performance") {
48         it("should handle multiple clients in reasonable time") {
49             val sink = CountingSink()
50             val sut = Sut(sink)
51             sut.configurationProvider.updateConfiguration(basicConfiguration)
52
53             val numMessages: Long = 300_000
54             val runs = 4
55             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
56
57             val params = MessageParameters(
58                     commonEventHeader = vesEvent().commonEventHeader,
59                     amount = numMessages)
60             val fluxes = (1.rangeTo(runs)).map {
61                 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
62             }
63             val durationMs = measureTimeMillis {
64                 Flux.merge(fluxes).then().block(timeout)
65             }
66
67             val durationSec = durationMs / 1000.0
68             val throughput = sink.count / durationSec
69             logger.info("Processed $runs connections each containing $numMessages msgs.")
70             logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
71             assertThat(sink.count)
72                     .describedAs("should send all events")
73                     .isEqualTo(runs * numMessages)
74         }
75
76         it("should disconnect on transmission errors") {
77             val sink = CountingSink()
78             val sut = Sut(sink)
79             sut.configurationProvider.updateConfiguration(basicConfiguration)
80
81             val numMessages: Long = 100_000
82             val timeout = Duration.ofSeconds(30)
83
84             val params = MessageParameters(
85                     commonEventHeader = vesEvent().commonEventHeader,
86                     amount = numMessages)
87
88             val dataStream = generateDataStream(sut.alloc, params)
89                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
90             sut.collector.handleConnection(sut.alloc, dataStream)
91                     .timeout(timeout)
92                     .block()
93
94             logger.info("Forwarded ${sink.count} msgs")
95             assertThat(sink.count)
96                     .describedAs("should send up to number of events")
97                     .isLessThan(numMessages)
98         }
99     }
100
101     describe("test infrastructure") {
102         val digest = MessageDigest.getInstance("MD5")
103
104         fun collectDigest(bb: ByteBuf) {
105             bb.markReaderIndex()
106             while (bb.isReadable) {
107                 digest.update(bb.readByte())
108             }
109             bb.resetReaderIndex()
110         }
111
112         fun calculateDigest(arrays: List<ByteArray>): ByteArray {
113             for (array in arrays) {
114                 digest.update(array)
115             }
116             return digest.digest()
117         }
118
119         it("should yield same bytes as in the input") {
120             val numberOfBuffers = 10
121             val singleBufferSize = 1000
122             val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
123             val inputDigest = calculateDigest(arrays)
124
125             val actualTotalSize = Flux.fromIterable(arrays)
126                     .map { Unpooled.wrappedBuffer(it) }
127                     .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
128                     .doOnNext(::collectDigest)
129                     .map {
130                         val size = it.readableBytes()
131                         it.release()
132                         size
133                     }
134                     .sum()
135                     .map(Long::toInt)
136                     .block()
137
138             val outputDigest = digest.digest()
139
140             assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
141             assertThat(outputDigest).isEqualTo(inputDigest)
142
143         }
144     }
145 })
146
147
148 private const val ONE_MILION = 1_000_000.0
149
150 private val rand = Random()
151 private fun randomByteArray(size: Int): ByteArray {
152     val bytes = ByteArray(size)
153     rand.nextBytes(bytes)
154     return bytes
155 }
156
157 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
158         stream.index()
159                 .filter { predicate(it.t1) }
160                 .map { it.t2 }
161
162 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
163         WireFrameEncoder(alloc).let { encoder ->
164             MessageGenerator.INSTANCE
165                     .createMessageFlux(params)
166                     .map(encoder::encode)
167                     .transform { simulateRemoteTcp(alloc, 1000, it) }
168         }
169
170 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
171         byteBuffers
172                 .bufferTimeout(maxSize, Duration.ofMillis(250))
173                 .map { joinBuffers(alloc, it) }
174                 .concatMap { randomlySplitTcpFrames(it) }
175
176 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
177         alloc.compositeBuffer().addComponents(true, it)
178
179 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
180     val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
181     return Flux.create<ByteBuf> { sink ->
182         while (bb.isReadable) {
183             val frameSize = Math.min(targetFrameSize, bb.readableBytes())
184             sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
185             bb.readerIndex(bb.readerIndex() + frameSize)
186         }
187         bb.release()
188         sink.complete()
189     }
190 }