00739fa496178dd80677b55de19f4b7d03175bfc
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
33 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
34 import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator
35 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import reactor.core.publisher.Flux
38 import reactor.math.sum
39 import java.security.MessageDigest
40 import java.time.Duration
41 import java.util.*
42 import kotlin.system.measureTimeMillis
43
44 /**
45  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
46  * @since May 2018
47  */
48 object PerformanceSpecification : Spek({
49     debugRx(false)
50
51     describe("VES High Volume Collector performance") {
52         it("should handle multiple clients in reasonable time") {
53             val sink = CountingSink()
54             val sut = Sut(sink)
55             sut.configurationProvider.updateConfiguration(basicConfiguration)
56
57             val numMessages: Long = 300_000
58             val runs = 4
59             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
60
61             val params = MessageParameters(
62                     commonEventHeader = vesEvent().commonEventHeader,
63                     amount = numMessages)
64             val fluxes = (1.rangeTo(runs)).map {
65                 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
66             }
67             val durationMs = measureTimeMillis {
68                 Flux.merge(fluxes).then().block(timeout)
69             }
70
71             val durationSec = durationMs / 1000.0
72             val throughput = sink.count / durationSec
73             logger.info("Processed $runs connections each containing $numMessages msgs.")
74             logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
75             assertThat(sink.count)
76                     .describedAs("should send all events")
77                     .isEqualTo(runs * numMessages)
78         }
79
80         it("should disconnect on transmission errors") {
81             val sink = CountingSink()
82             val sut = Sut(sink)
83             sut.configurationProvider.updateConfiguration(basicConfiguration)
84
85             val numMessages: Long = 100_000
86             val timeout = Duration.ofSeconds(30)
87
88             val params = MessageParameters(
89                     commonEventHeader = vesEvent().commonEventHeader,
90                     amount = numMessages)
91
92             val dataStream = generateDataStream(sut.alloc, params)
93                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
94             sut.collector.handleConnection(sut.alloc, dataStream)
95                     .timeout(timeout)
96                     .block()
97
98             logger.info("Forwarded ${sink.count} msgs")
99             assertThat(sink.count)
100                     .describedAs("should send up to number of events")
101                     .isLessThan(numMessages)
102         }
103     }
104
105     describe("test infrastructure") {
106         val digest = MessageDigest.getInstance("MD5")
107
108         fun collectDigest(bb: ByteBuf) {
109             bb.markReaderIndex()
110             while (bb.isReadable) {
111                 digest.update(bb.readByte())
112             }
113             bb.resetReaderIndex()
114         }
115
116         fun calculateDigest(arrays: List<ByteArray>): ByteArray {
117             for (array in arrays) {
118                 digest.update(array)
119             }
120             return digest.digest()
121         }
122
123         it("should yield same bytes as in the input") {
124             val numberOfBuffers = 10
125             val singleBufferSize = 1000
126             val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
127             val inputDigest = calculateDigest(arrays)
128
129             val actualTotalSize = Flux.fromIterable(arrays)
130                     .map { Unpooled.wrappedBuffer(it) }
131                     .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
132                     .doOnNext(::collectDigest)
133                     .map {
134                         val size = it.readableBytes()
135                         it.release()
136                         size
137                     }
138                     .sum()
139                     .map(Long::toInt)
140                     .block()
141
142             val outputDigest = digest.digest()
143
144             assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
145             assertThat(outputDigest).isEqualTo(inputDigest)
146
147         }
148     }
149 })
150
151
152 private const val ONE_MILION = 1_000_000.0
153
154 private val rand = Random()
155 private fun randomByteArray(size: Int): ByteArray {
156     val bytes = ByteArray(size)
157     rand.nextBytes(bytes)
158     return bytes
159 }
160
161 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
162         stream.index()
163                 .filter { predicate(it.t1) }
164                 .map { it.t2 }
165
166 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
167         WireFrameEncoder(alloc).let { encoder ->
168             createMessageGenerator()
169                     .createMessageFlux(params)
170                     .map(encoder::encode)
171                     .transform { simulateRemoteTcp(alloc, 1000, it) }
172         }
173
174 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
175         byteBuffers
176                 .bufferTimeout(maxSize, Duration.ofMillis(250))
177                 .map { joinBuffers(alloc, it) }
178                 .concatMap { randomlySplitTcpFrames(it) }
179
180 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
181         alloc.compositeBuffer().addComponents(true, it)
182
183 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
184     val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
185     return Flux.create<ByteBuf> { sink ->
186         while (bb.isReadable) {
187             val frameSize = Math.min(targetFrameSize, bb.readableBytes())
188             sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
189             bb.readerIndex(bb.readerIndex() + frameSize)
190         }
191         bb.release()
192         sink.complete()
193     }
194 }