+ describe("configuration update") {
+
+ val defaultTimeout = Duration.ofSeconds(10)
+
+ it("should update collector on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val firstCollector = sut.collector
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ val collectorAfterUpdate = sut.collector
+
+ assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+
+ }
+
+ it("should start routing messages on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messages).isEmpty()
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(1)
+ val message = messagesAfterUpdate[0]
+
+ assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(message.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should change domain routing on configuration change") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messages).hasSize(1)
+ val firstMessage = messages[0]
+
+ assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ .isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(firstMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ assertThat(messagesAfterUpdate).hasSize(2)
+ val secondMessage = messagesAfterUpdate[1]
+
+ assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ assertThat(secondMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should update routing for each client sending one message") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAmount = 10
+ val messagesForEachTopic = 5
+
+ Flux.range(0, messagesAmount).doOnNext {
+ if (it == messagesForEachTopic) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ }
+ }.doOnNext {
+ sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ }.then().block(defaultTimeout)
+
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messagesAmount)
+ assertThat(messagesForEachTopic)
+ .describedAs("amount of messages routed to each topic")
+ .isEqualTo(firstTopicMessagesCount)
+ .isEqualTo(secondTopicMessagesCount)
+ }
+
+
+ it("should not update routing for client sending continuous stream of messages") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messageStreamSize = 10
+ val pivot = 5
+
+ val incomingMessages = Flux.range(0, messageStreamSize)
+ .doOnNext {
+ if (it == pivot) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ println("config changed")
+ }
+ }
+ .map { vesMessage(Domain.HVRANMEAS) }
+
+
+ sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messageStreamSize)
+ assertThat(firstTopicMessagesCount)
+ .describedAs("amount of messages routed to first topic")
+ .isEqualTo(messageStreamSize)
+
+ assertThat(secondTopicMessagesCount)
+ .describedAs("amount of messages routed to second topic")
+ .isEqualTo(0)
+ }
+ }
+