import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.Routing
-import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.tests.fakes.*
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import reactor.core.publisher.Flux
+import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
     }
 
     describe("Memory management") {
-
         it("should release memory for each handled and dropped message") {
             val sink = StoringSink()
             val sut = Sut(sink)
             assertThat(messages.get(2).topic).describedAs("last message topic")
                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
         }
-        
+
         it("should drop message if route was not found") {
             val sink = StoringSink()
             val sut = Sut(sink)
         }
     }
 
+    describe("configuration update") {
+
+        val defaultTimeout = Duration.ofSeconds(10)
+
+        it("should update collector on configuration change") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val firstCollector = sut.collector
+
+            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+            val collectorAfterUpdate = sut.collector
+
+            assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+
+        }
+
+        it("should start routing messages on configuration change") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+            val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messages).isEmpty()
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messagesAfterUpdate).hasSize(1)
+            val message = messagesAfterUpdate[0]
+
+            assertThat(message.topic).describedAs("routed message topic after configuration's change")
+                    .isEqualTo(HVRANMEAS_TOPIC)
+            assertThat(message.partition).describedAs("routed message partition")
+                    .isEqualTo(0)
+        }
+
+        it("should change domain routing on configuration change") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messages).hasSize(1)
+            val firstMessage = messages[0]
+
+            assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+                    .isEqualTo(HVRANMEAS_TOPIC)
+            assertThat(firstMessage.partition).describedAs("routed message partition")
+                    .isEqualTo(0)
+
+
+            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+            val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messagesAfterUpdate).hasSize(2)
+            val secondMessage = messagesAfterUpdate[1]
+
+            assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+                    .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+            assertThat(secondMessage.partition).describedAs("routed message partition")
+                    .isEqualTo(0)
+        }
+
+        it("should update routing for each client sending one message") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messagesAmount = 10
+            val messagesForEachTopic = 5
+
+            Flux.range(0, messagesAmount).doOnNext {
+                if (it == messagesForEachTopic) {
+                    sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                }
+            }.doOnNext {
+                sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            }.then().block(defaultTimeout)
+
+
+            val messages = sink.sentMessages
+            val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+            val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+            assertThat(messages.size).isEqualTo(messagesAmount)
+            assertThat(messagesForEachTopic)
+                    .describedAs("amount of messages routed to each topic")
+                    .isEqualTo(firstTopicMessagesCount)
+                    .isEqualTo(secondTopicMessagesCount)
+        }
+
+
+        it("should not update routing for client sending continuous stream of messages") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messageStreamSize = 10
+            val pivot = 5
+
+            val incomingMessages = Flux.range(0, messageStreamSize)
+                    .doOnNext {
+                        if (it == pivot) {
+                            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                            println("config changed")
+                        }
+                    }
+                    .map { vesMessage(Domain.HVRANMEAS) }
+
+
+            sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+            val messages = sink.sentMessages
+            val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+            val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+            assertThat(messages.size).isEqualTo(messageStreamSize)
+            assertThat(firstTopicMessagesCount)
+                    .describedAs("amount of messages routed to first topic")
+                    .isEqualTo(messageStreamSize)
+
+            assertThat(secondTopicMessagesCount)
+                    .describedAs("amount of messages routed to second topic")
+                    .isEqualTo(0)
+        }
+    }
+
     describe("request validation") {
         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
             val sink = StoringSink()