35dfba8bdb0aa13a510398d23e4c84d1d1310ab1
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
34 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
35 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
36 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
37 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
38 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
39 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
40 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
41 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
42 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
43 import reactor.core.publisher.Flux
44 import reactor.math.sum
45 import java.security.MessageDigest
46 import java.time.Duration
47 import java.util.*
48 import kotlin.system.measureTimeMillis
49
50 /**
51  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
52  * @since May 2018
53  */
54 object PerformanceSpecification : Spek({
55     debugRx(false)
56
57     describe("VES High Volume Collector performance") {
58         it("should handle multiple clients in reasonable time") {
59             val sink = CountingSink()
60             val sut = Sut(CollectorConfiguration(basicRouting), sink)
61
62             val numMessages: Long = 300_000
63             val runs = 4
64             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
65
66             val params = VesEventParameters(
67                     commonEventHeader = commonHeader(PERF3GPP),
68                     messageType = VesEventType.VALID,
69                     amount = numMessages
70             )
71
72             val fluxes = (1.rangeTo(runs)).map {
73                 sut.collector.handleConnection(generateDataStream(sut.alloc, params))
74             }
75             val durationMs = measureTimeMillis {
76                 Flux.merge(fluxes).then().block(timeout)
77             }
78
79             val durationSec = durationMs / 1000.0
80             val throughput = sink.count / durationSec
81             logger.info { "Processed $runs connections each containing $numMessages msgs." }
82             logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
83             assertThat(sink.count)
84                     .describedAs("should send all events")
85                     .isEqualTo(runs * numMessages)
86         }
87
88         it("should disconnect on transmission errors") {
89             val sink = CountingSink()
90             val sut = Sut(CollectorConfiguration(basicRouting), sink)
91
92             val numMessages: Long = 100_000
93             val timeout = Duration.ofSeconds(30)
94
95             val params = VesEventParameters(
96                     commonEventHeader = commonHeader(PERF3GPP),
97                     messageType = VesEventType.VALID,
98                     amount = numMessages
99             )
100
101             val dataStream = generateDataStream(sut.alloc, params)
102                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
103             sut.collector.handleConnection(dataStream)
104                     .timeout(timeout)
105                     .block()
106
107             logger.info { "Forwarded ${sink.count} msgs" }
108             assertThat(sink.count)
109                     .describedAs("should send up to number of events")
110                     .isLessThan(numMessages)
111         }
112     }
113
114     describe("test infrastructure") {
115         val digest = MessageDigest.getInstance("MD5")
116
117         fun collectDigest(bb: ByteBuf) {
118             bb.markReaderIndex()
119             while (bb.isReadable) {
120                 digest.update(bb.readByte())
121             }
122             bb.resetReaderIndex()
123         }
124
125         fun calculateDigest(arrays: List<ByteArray>): ByteArray {
126             for (array in arrays) {
127                 digest.update(array)
128             }
129             return digest.digest()
130         }
131
132         it("should yield same bytes as in the input") {
133             val numberOfBuffers = 10
134             val singleBufferSize = 1000
135             val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
136             val inputDigest = calculateDigest(arrays)
137
138             val actualTotalSize = Flux.fromIterable(arrays)
139                     .map { Unpooled.wrappedBuffer(it) }
140                     .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
141                     .doOnNext(::collectDigest)
142                     .map {
143                         val size = it.readableBytes()
144                         it.release()
145                         size
146                     }
147                     .sum()
148                     .map(Long::toInt)
149                     .block()
150
151             val outputDigest = digest.digest()
152
153             assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
154             assertThat(outputDigest).isEqualTo(inputDigest)
155
156         }
157     }
158 })
159
160
161 private const val ONE_MILLION = 1_000_000.0
162 private val rand = Random()
163 private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
164
165 private fun randomByteArray(size: Int): ByteArray {
166     val bytes = ByteArray(size)
167     rand.nextBytes(bytes)
168     return bytes
169 }
170
171 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
172         stream.index()
173                 .filter { predicate(it.t1) }
174                 .map { it.t2 }
175
176 private fun generateDataStream(alloc: ByteBufAllocator, params: VesEventParameters): Flux<ByteBuf> =
177         WireFrameEncoder(alloc).let { encoder ->
178             generatorsFactory.createVesEventGenerator()
179                     .createMessageFlux(params)
180                     .map { WireFrameMessage(it.toByteArray()) }
181                     .map(encoder::encode)
182                     .transform { simulateRemoteTcp(alloc, 1000, it) }
183         }
184
185 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
186         byteBuffers
187                 .bufferTimeout(maxSize, Duration.ofMillis(250))
188                 .map { joinBuffers(alloc, it) }
189                 .concatMap { randomlySplitTcpFrames(it) }
190
191 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
192         alloc.compositeBuffer().addComponents(true, it)
193
194 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
195     val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
196     return Flux.create<ByteBuf> { sink ->
197         while (bb.isReadable) {
198             val frameSize = Math.min(targetFrameSize, bb.readableBytes())
199             sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
200             bb.readerIndex(bb.readerIndex() + frameSize)
201         }
202         bb.release()
203         sink.complete()
204     }
205 }
206