val requiredFieldDescriptors = listOf(
"version",
"eventName",
- "domain",
+ // "domain", TODO to be restored back when GPB schema will include default value
"eventId",
"sourceName",
"reportingEntityName",
- "priority",
+ // "priority", TODO to be restored back when GPB schema will include default value
"startEpochMicrosec",
"lastEpochMicrosec",
"sequence")
.map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)}
fun isValid(message: VesMessage): Boolean {
- val header = message.header
- val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
- return ret
+ return allMandatoryFieldsArePresent(message.header)
}
private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
}
- it("should reject message with domain other than HVRANMEAS") {
- Domain.values()
- .filter { it != Domain.HVRANMEAS && it != Domain.UNRECOGNIZED }
- .forEach { domain ->
+ Domain.values()
+ .filter { it != Domain.UNRECOGNIZED }
+ .forEach {domain ->
+ it("should accept message with $domain domain"){
val header = newBuilder(commonHeader).setDomain(domain).build()
val vesMessage = VesMessage(header, vesMessageBytes(header))
assertThat(cut.isValid(vesMessage))
- .describedAs("message with $domain domain")
- .isFalse()
+ .isTrue()
}
- }
+ }
}
on("ves hv message bytes") {
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.time.Duration
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.Routing
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.tests.fakes.*
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
/**
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
+ it("should be able to direct 2 messages from different domains to one topic") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+
+ val messages = sut.handleConnection(sink,
+ vesMessage(Domain.HVRANMEAS),
+ vesMessage(Domain.HEARTBEAT),
+ vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+
+ assertThat(messages).describedAs("number of routed messages").hasSize(3)
+
+ assertThat(messages.get(0).topic).describedAs("first message topic")
+ .isEqualTo(HVRANMEAS_TOPIC)
+
+ assertThat(messages.get(1).topic).describedAs("second message topic")
+ .isEqualTo(HVRANMEAS_TOPIC)
+
+ assertThat(messages.get(2).topic).describedAs("last message topic")
+ .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ }
+
it("should drop message if route was not found") {
val sink = StoringSink()
val sut = Sut(sink)
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
}
}
+
})
const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
}.build()
)
+val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic(HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(Domain.HEARTBEAT)
+ toTopic(HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+ toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
class FakeConfigurationProvider : ConfigurationProvider {
private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()