* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.None
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
.hasSize(2)
}
+ it("should create sink lazily") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ // just connecting should not create sink
+ sut.handleConnection()
+ sut.close().unsafeRunSync()
+
+ // then
+ assertThat(sink.closed).isFalse()
+ }
+
it("should close sink when closing collector provider") {
- val (sut, _) = vesHvWithStoringSink()
+ val (sut, sink) = vesHvWithStoringSink()
+ // given Sink initialized
+ // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
+ sut.handleConnection(vesWireFrameMessage(PERF3GPP))
- sut.close()
+ // when
+ sut.close().unsafeRunSync()
- assertThat(sut.sinkProvider.closed).isTrue()
+ // then
+ assertThat(sink.closed).isTrue()
}
}
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
- assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
}
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages[0].topic).describedAs("first message topic")
+ assertThat(messages[0].targetTopic).describedAs("first message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[1].topic).describedAs("second message topic")
+ assertThat(messages[1].targetTopic).describedAs("second message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[2].topic).describedAs("last message topic")
- .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ assertThat(messages[2].targetTopic).describedAs("last message topic")
+ .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
}
it("should drop message if route was not found") {
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
it("should update collector") {
val firstCollector = sut.collector
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val collectorAfterUpdate = sut.collector
assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
+ sut.configurationProvider.updateConfiguration(emptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
- assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should change domain routing") {
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
- assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
- assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should update routing for each client sending one message") {
Flux.range(0, messagesAmount).doOnNext {
if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
}
}.doOnNext {
sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
val incomingMessages = Flux.range(0, messageStreamSize)
.doOnNext {
if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
println("config changed")
}
}
sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
return Pair(sut, sink)
}