Custom detekt rule for logger usage check
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / PerformanceSpecification.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
33 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
34 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
35 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
36 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
39 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
40 import reactor.core.publisher.Flux
41 import reactor.math.sum
42 import java.security.MessageDigest
43 import java.time.Duration
44 import java.util.*
45 import kotlin.system.measureTimeMillis
46
47 /**
48  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
49  * @since May 2018
50  */
51 object PerformanceSpecification : Spek({
52     debugRx(false)
53
54     describe("VES High Volume Collector performance") {
55         it("should handle multiple clients in reasonable time") {
56             val sink = CountingSink()
57             val sut = Sut(sink)
58             sut.configurationProvider.updateConfiguration(basicConfiguration)
59
60             val numMessages: Long = 300_000
61             val runs = 4
62             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
63
64             val params = MessageParameters(
65                     commonEventHeader = commonHeader(PERF3GPP),
66                     messageType = VALID,
67                     amount = numMessages
68             )
69
70             val fluxes = (1.rangeTo(runs)).map {
71                 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
72             }
73             val durationMs = measureTimeMillis {
74                 Flux.merge(fluxes).then().block(timeout)
75             }
76
77             val durationSec = durationMs / 1000.0
78             val throughput = sink.count / durationSec
79             logger.info("Processed $runs connections each containing $numMessages msgs.")
80             logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
81             assertThat(sink.count)
82                     .describedAs("should send all events")
83                     .isEqualTo(runs * numMessages)
84         }
85
86         it("should disconnect on transmission errors") {
87             val sink = CountingSink()
88             val sut = Sut(sink)
89             sut.configurationProvider.updateConfiguration(basicConfiguration)
90
91             val numMessages: Long = 100_000
92             val timeout = Duration.ofSeconds(30)
93
94             val params = MessageParameters(
95                     commonEventHeader = commonHeader(PERF3GPP),
96                     messageType = VALID,
97                     amount = numMessages
98             )
99
100             val dataStream = generateDataStream(sut.alloc, params)
101                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
102             sut.collector.handleConnection(sut.alloc, dataStream)
103                     .timeout(timeout)
104                     .block()
105
106             logger.info("Forwarded ${sink.count} msgs")
107             assertThat(sink.count)
108                     .describedAs("should send up to number of events")
109                     .isLessThan(numMessages)
110         }
111     }
112
113     describe("test infrastructure") {
114         val digest = MessageDigest.getInstance("MD5")
115
116         fun collectDigest(bb: ByteBuf) {
117             bb.markReaderIndex()
118             while (bb.isReadable) {
119                 digest.update(bb.readByte())
120             }
121             bb.resetReaderIndex()
122         }
123
124         fun calculateDigest(arrays: List<ByteArray>): ByteArray {
125             for (array in arrays) {
126                 digest.update(array)
127             }
128             return digest.digest()
129         }
130
131         it("should yield same bytes as in the input") {
132             val numberOfBuffers = 10
133             val singleBufferSize = 1000
134             val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
135             val inputDigest = calculateDigest(arrays)
136
137             val actualTotalSize = Flux.fromIterable(arrays)
138                     .map { Unpooled.wrappedBuffer(it) }
139                     .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
140                     .doOnNext(::collectDigest)
141                     .map {
142                         val size = it.readableBytes()
143                         it.release()
144                         size
145                     }
146                     .sum()
147                     .map(Long::toInt)
148                     .block()
149
150             val outputDigest = digest.digest()
151
152             assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
153             assertThat(outputDigest).isEqualTo(inputDigest)
154
155         }
156     }
157 })
158
159
160 private const val ONE_MILION = 1_000_000.0
161
162 private val rand = Random()
163 private fun randomByteArray(size: Int): ByteArray {
164     val bytes = ByteArray(size)
165     rand.nextBytes(bytes)
166     return bytes
167 }
168
169 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
170         stream.index()
171                 .filter { predicate(it.t1) }
172                 .map { it.t2 }
173
174 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
175         WireFrameEncoder(alloc).let { encoder ->
176             MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
177                     .createMessageFlux(listOf(params))
178                     .map(encoder::encode)
179                     .transform { simulateRemoteTcp(alloc, 1000, it) }
180         }
181
182 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
183         byteBuffers
184                 .bufferTimeout(maxSize, Duration.ofMillis(250))
185                 .map { joinBuffers(alloc, it) }
186                 .concatMap { randomlySplitTcpFrames(it) }
187
188 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
189         alloc.compositeBuffer().addComponents(true, it)
190
191 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
192     val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
193     return Flux.create<ByteBuf> { sink ->
194         while (bb.isReadable) {
195             val frameSize = Math.min(targetFrameSize, bb.readableBytes())
196             sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
197             bb.readerIndex(bb.readerIndex() + frameSize)
198         }
199         bb.release()
200         sink.complete()
201     }
202 }
203