61a9a3562186d1aecde9529bcb0b47f6a51ded52
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
33 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
34 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
35 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
36 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
37 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
38 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
39 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
40 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
41 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
42 import reactor.core.publisher.Flux
43 import reactor.math.sum
44 import java.security.MessageDigest
45 import java.time.Duration
46 import java.util.*
47 import kotlin.system.measureTimeMillis
48
49 /**
50  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
51  * @since May 2018
52  */
53 object PerformanceSpecification : Spek({
54     debugRx(false)
55
56     describe("VES High Volume Collector performance") {
57         it("should handle multiple clients in reasonable time") {
58             val sink = CountingSink()
59             val sut = Sut(sink)
60             sut.configurationProvider.updateConfiguration(basicRouting)
61
62             val numMessages: Long = 300_000
63             val runs = 4
64             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
65
66             val params = VesEventParameters(
67                     commonEventHeader = commonHeader(PERF3GPP),
68                     messageType = VesEventType.VALID,
69                     amount = numMessages
70             )
71
72             val fluxes = (1.rangeTo(runs)).map {
73                 sut.collector.handleConnection(generateDataStream(sut.alloc, params))
74             }
75             val durationMs = measureTimeMillis {
76                 Flux.merge(fluxes).then().block(timeout)
77             }
78
79             val durationSec = durationMs / 1000.0
80             val throughput = sink.count / durationSec
81             logger.info { "Processed $runs connections each containing $numMessages msgs." }
82             logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
83             assertThat(sink.count)
84                     .describedAs("should send all events")
85                     .isEqualTo(runs * numMessages)
86         }
87
88         it("should disconnect on transmission errors") {
89             val sink = CountingSink()
90             val sut = Sut(sink)
91             sut.configurationProvider.updateConfiguration(basicRouting)
92
93             val numMessages: Long = 100_000
94             val timeout = Duration.ofSeconds(30)
95
96             val params = VesEventParameters(
97                     commonEventHeader = commonHeader(PERF3GPP),
98                     messageType = VesEventType.VALID,
99                     amount = numMessages
100             )
101
102             val dataStream = generateDataStream(sut.alloc, params)
103                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
104             sut.collector.handleConnection(dataStream)
105                     .timeout(timeout)
106                     .block()
107
108             logger.info { "Forwarded ${sink.count} msgs" }
109             assertThat(sink.count)
110                     .describedAs("should send up to number of events")
111                     .isLessThan(numMessages)
112         }
113     }
114
115     describe("test infrastructure") {
116         val digest = MessageDigest.getInstance("MD5")
117
118         fun collectDigest(bb: ByteBuf) {
119             bb.markReaderIndex()
120             while (bb.isReadable) {
121                 digest.update(bb.readByte())
122             }
123             bb.resetReaderIndex()
124         }
125
126         fun calculateDigest(arrays: List<ByteArray>): ByteArray {
127             for (array in arrays) {
128                 digest.update(array)
129             }
130             return digest.digest()
131         }
132
133         it("should yield same bytes as in the input") {
134             val numberOfBuffers = 10
135             val singleBufferSize = 1000
136             val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
137             val inputDigest = calculateDigest(arrays)
138
139             val actualTotalSize = Flux.fromIterable(arrays)
140                     .map { Unpooled.wrappedBuffer(it) }
141                     .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
142                     .doOnNext(::collectDigest)
143                     .map {
144                         val size = it.readableBytes()
145                         it.release()
146                         size
147                     }
148                     .sum()
149                     .map(Long::toInt)
150                     .block()
151
152             val outputDigest = digest.digest()
153
154             assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
155             assertThat(outputDigest).isEqualTo(inputDigest)
156
157         }
158     }
159 })
160
161
162 private const val ONE_MILLION = 1_000_000.0
163 private val rand = Random()
164 private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
165
166 private fun randomByteArray(size: Int): ByteArray {
167     val bytes = ByteArray(size)
168     rand.nextBytes(bytes)
169     return bytes
170 }
171
172 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
173         stream.index()
174                 .filter { predicate(it.t1) }
175                 .map { it.t2 }
176
177 private fun generateDataStream(alloc: ByteBufAllocator, params: VesEventParameters): Flux<ByteBuf> =
178         WireFrameEncoder(alloc).let { encoder ->
179             generatorsFactory.createVesEventGenerator()
180                     .createMessageFlux(params)
181                     .map { WireFrameMessage(it.toByteArray()) }
182                     .map(encoder::encode)
183                     .transform { simulateRemoteTcp(alloc, 1000, it) }
184         }
185
186 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
187         byteBuffers
188                 .bufferTimeout(maxSize, Duration.ofMillis(250))
189                 .map { joinBuffers(alloc, it) }
190                 .concatMap { randomlySplitTcpFrames(it) }
191
192 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
193         alloc.compositeBuffer().addComponents(true, it)
194
195 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
196     val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
197     return Flux.create<ByteBuf> { sink ->
198         while (bb.isReadable) {
199             val frameSize = Math.min(targetFrameSize, bb.readableBytes())
200             sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
201             bb.readerIndex(bb.readerIndex() + frameSize)
202         }
203         bb.release()
204         sink.complete()
205     }
206 }
207