import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.Routing
-import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.tests.fakes.*
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import reactor.core.publisher.Flux
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
}
describe("Memory management") {
-
it("should release memory for each handled and dropped message") {
val sink = StoringSink()
val sut = Sut(sink)
assertThat(messages.get(2).topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
}
-
+
it("should drop message if route was not found") {
val sink = StoringSink()
val sut = Sut(sink)
}
}
+ describe("configuration update") {
+
+ val defaultTimeout = Duration.ofSeconds(10)
+
+ it("should update collector on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val firstCollector = sut.collector
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ val collectorAfterUpdate = sut.collector
+
+ assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+
+ }
+
+ it("should start routing messages on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messages).isEmpty()
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(1)
+ val message = messagesAfterUpdate[0]
+
+ assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(message.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should change domain routing on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messages).hasSize(1)
+ val firstMessage = messages[0]
+
+ assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(firstMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(2)
+ val secondMessage = messagesAfterUpdate[1]
+
+ assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ assertThat(secondMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should update routing for each client sending one message") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAmount = 10
+ val messagesForEachTopic = 5
+
+ Flux.range(0, messagesAmount).doOnNext {
+ if (it == messagesForEachTopic) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ }
+ }.doOnNext {
+ sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ }.then().block(defaultTimeout)
+
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messagesAmount)
+ assertThat(messagesForEachTopic)
+ .describedAs("amount of messages routed to each topic")
+ .isEqualTo(firstTopicMessagesCount)
+ .isEqualTo(secondTopicMessagesCount)
+ }
+
+
+ it("should not update routing for client sending continuous stream of messages") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messageStreamSize = 10
+ val pivot = 5
+
+ val incomingMessages = Flux.range(0, messageStreamSize)
+ .doOnNext {
+ if (it == pivot) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ println("config changed")
+ }
+ }
+ .map { vesMessage(Domain.HVRANMEAS) }
+
+
+ sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messageStreamSize)
+ assertThat(firstTopicMessagesCount)
+ .describedAs("amount of messages routed to first topic")
+ .isEqualTo(messageStreamSize)
+
+ assertThat(secondTopicMessagesCount)
+ .describedAs("amount of messages routed to second topic")
+ .isEqualTo(0)
+ }
+ }
+
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
val sink = StoringSink()