c68f05141087701ee5834e7dea79bb12f90bea06
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.syntax.function.partially1
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import io.netty.buffer.CompositeByteBuf
26 import io.netty.buffer.Unpooled
27 import io.netty.buffer.UnpooledByteBufAllocator
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
33 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
34 import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator
35 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import reactor.core.publisher.Flux
38 import reactor.math.sum
39 import java.security.MessageDigest
40 import java.time.Duration
41 import java.util.*
42 import kotlin.system.measureTimeMillis
43
44
45 /**
46  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
47  * @since May 2018
48  */
49 object PerformanceSpecification : Spek({
50     describe("VES High Volume Collector performance") {
51         it("should handle multiple clients in reasonable time") {
52             val sink = CountingSink()
53             val sut = Sut(sink)
54             sut.configurationProvider.updateConfiguration(basicConfiguration)
55
56             val numMessages: Long = 300_000
57             val runs = 4
58             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
59
60             val params = MessageParameters(
61                     commonEventHeader = vesEvent().commonEventHeader,
62                     amount = numMessages)
63             val fluxes = (1.rangeTo(runs)).map {
64                 sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
65             }
66             val durationMs = measureTimeMillis {
67                 Flux.merge(fluxes).then().block(timeout)
68             }
69
70             val durationSec = durationMs / 1000.0
71             val throughput = sink.count / durationSec
72             println("Processed $runs connections each containing $numMessages msgs.")
73             println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
74             assertThat(sink.count)
75                     .describedAs("should send all events")
76                     .isEqualTo(runs * numMessages)
77         }
78
79         it("should disconnect on transmission errors") {
80             val sink = CountingSink()
81             val sut = Sut(sink)
82             sut.configurationProvider.updateConfiguration(basicConfiguration)
83
84             val numMessages: Long = 100_000
85             val timeout = Duration.ofSeconds(30)
86
87             val params = MessageParameters(
88                     commonEventHeader = vesEvent().commonEventHeader,
89                     amount = numMessages)
90
91             val dataStream = generateDataStream(sut.alloc, params)
92                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
93             sut.collector.handleConnection(sut.alloc, dataStream)
94                     .timeout(timeout)
95                     .block()
96
97             println("Forwarded ${sink.count} msgs")
98             assertThat(sink.count)
99                     .describedAs("should send up to number of events")
100                     .isLessThan(numMessages)
101         }
102     }
103
104     describe("test infrastructure") {
105         val digest = MessageDigest.getInstance("MD5")
106
107         fun collectDigest(bb: ByteBuf) {
108             bb.markReaderIndex()
109             while (bb.isReadable) {
110                 digest.update(bb.readByte())
111             }
112             bb.resetReaderIndex()
113         }
114
115         fun calculateDigest(arrays: List<ByteArray>): ByteArray {
116             for (array in arrays) {
117                 digest.update(array)
118             }
119             return digest.digest()
120         }
121
122         it("should yield same bytes as in the input") {
123             val numberOfBuffers = 10
124             val singleBufferSize = 1000
125             val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
126             val inputDigest = calculateDigest(arrays)
127
128             val actualTotalSize = Flux.fromIterable(arrays)
129                     .map { Unpooled.wrappedBuffer(it) }
130                     .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
131                     .doOnNext(::collectDigest)
132                     .map {
133                         val size = it.readableBytes()
134                         it.release()
135                         size
136                     }
137                     .sum()
138                     .map(Long::toInt)
139                     .block()
140
141             val outputDigest = digest.digest()
142
143             assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
144             assertThat(outputDigest).isEqualTo(inputDigest)
145
146         }
147     }
148 })
149
150
151 private const val ONE_MILION = 1_000_000.0
152
153 private val rand = Random()
154 private fun randomByteArray(size: Int): ByteArray {
155     val bytes = ByteArray(size)
156     rand.nextBytes(bytes)
157     return bytes
158 }
159
160 fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
161         stream.index()
162                 .filter { predicate(it.t1) }
163                 .map { it.t2 }
164
165 private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
166         WireFrameEncoder(alloc).let { encoder ->
167             createMessageGenerator()
168                     .createMessageFlux(params)
169                     .map(encoder::encode)
170                     .transform { simulateRemoteTcp(alloc, 1000, it) }
171         }
172
173 private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
174         byteBuffers
175                 .bufferTimeout(maxSize, Duration.ofMillis(250))
176                 .map { joinBuffers(alloc, it) }
177                 .concatMap { randomlySplitTcpFrames(it) }
178
179 private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
180         alloc.compositeBuffer().addComponents(true, it)
181
182 private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
183     val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
184     return Flux.create<ByteBuf> { sink ->
185         while (bb.isReadable) {
186             val frameSize = Math.min(targetFrameSize, bb.readableBytes())
187             sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
188             bb.readerIndex(bb.readerIndex() + frameSize)
189         }
190         bb.release()
191         sink.complete()
192     }
193 }