)
val fluxes = (1.rangeTo(runs)).map {
- sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+ sut.collector.handleConnection(generateDataStream(sut.alloc, params))
}
val durationMs = measureTimeMillis {
Flux.merge(fluxes).then().block(timeout)
val durationSec = durationMs / 1000.0
val throughput = sink.count / durationSec
- logger.info("Processed $runs connections each containing $numMessages msgs.")
- logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+ logger.info { "Processed $runs connections each containing $numMessages msgs." }
+ logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
assertThat(sink.count)
.describedAs("should send all events")
.isEqualTo(runs * numMessages)
val dataStream = generateDataStream(sut.alloc, params)
.transform(::dropWhenIndex.partially1 { it % 101 == 0L })
- sut.collector.handleConnection(sut.alloc, dataStream)
+ sut.collector.handleConnection(dataStream)
.timeout(timeout)
.block()
- logger.info("Forwarded ${sink.count} msgs")
+ logger.info { "Forwarded ${sink.count} msgs" }
assertThat(sink.count)
.describedAs("should send up to number of events")
.isLessThan(numMessages)