import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
internal object MessageValidator {
import arrow.core.Option
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventOuterClass.VesEvent
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
for (route in routing) {
val routeObj = route.asJsonObject()
defineRoute {
- fromDomain(forNumber(routeObj.getInt("fromDomain")))
+ fromDomain(routeObj.getString("fromDomain"))
toTopic(routeObj.getString("toTopic"))
withFixedPartitioning()
}
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.impl.MessageValidator
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
package org.onap.dcae.collectors.veshv.model
import arrow.core.Option
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
data class Routing(val routes: List<Route>) {
Option.fromNullable(routes.find { it.applies(commonHeader) })
}
-data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
class RouteBuilder {
- private lateinit var domain: Domain
+ private lateinit var domain: String
private lateinit var targetTopic: String
private lateinit var partitioning: (CommonEventHeader) -> Int
- fun fromDomain(domain: Domain) {
+ fun fromDomain(domain: String) {
this.domain = domain
}
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
+
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.getDefaultInstance
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.newBuilder
internal object MessageValidatorTest : Spek({
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
}
- Domain.values()
- .filter { (it != Domain.UNRECOGNIZED && it != Domain.DOMAIN_UNDEFINED) }
+ VesEventDomain.values()
.forEach { domain ->
it("should accept message with $domain domain") {
val header = commonHeader(domain)
}
}
-
- val domainTestCases = mapOf(
- Domain.DOMAIN_UNDEFINED to false,
- Domain.FAULT to true
- )
-
- domainTestCases.forEach { value, expectedResult ->
- on("ves hv message including header with domain $value") {
- val commonEventHeader = commonHeader(value)
- val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
-
- it("should resolve validation result") {
- assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
- .isEqualTo(expectedResult)
- }
- }
- }
-
val priorityTestCases = mapOf(
- Priority.PRIORITY_UNDEFINED to false,
+ Priority.PRIORITY_NOT_PROVIDED to false,
Priority.HIGH to true
)
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
val config = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
+ fromDomain(HVMEAS.name)
toTopic("ves_rtpm")
withFixedPartitioning(2)
}
defineRoute {
- fromDomain(Domain.SYSLOG)
+ fromDomain(SYSLOG.name)
toTopic("ves_trace")
withFixedPartitioning()
}
val cut = Router(config)
on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(Domain.HVRANMEAS), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HVMEAS), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
}
on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(Domain.SYSLOG), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
}
on("message with unknown route") {
- val message = VesMessage(commonHeader(Domain.HEARTBEAT), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should not have route available") {
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
val cut = VesDecoder()
on("ves hv message bytes") {
- val commonHeader = commonHeader(Domain.HEARTBEAT)
+ val commonHeader = commonHeader(HEARTBEAT)
val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
it("should decode only header and pass it on along with raw message") {
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
- assertEquals("kafka:9093", it.kafkaBootstrapServers)
+ assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
val route1 = it.routing.routes[0]
- assertEquals(Domain.FAULT, route1.domain)
+ assertEquals(FAULT.name, route1.domain)
assertEquals("test-topic-1", route1.targetTopic)
val route2 = it.routing.routes[1]
- assertEquals(Domain.HEARTBEAT, route2.domain)
+ assertEquals(HEARTBEAT.name, route2.domain)
assertEquals("test-topic-2", route2.targetTopic)
}.verifyComplete()
.verifyErrorMessage("Test exception")
}
- it("should update the health state"){
+ it("should update the health state") {
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
.expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
+const val kafkaAddress = "message-router-kafka"
+
fun constructConsulResponse(): String {
val config = """{
- "dmaap.kafkaBootstrapServers": "kafka:9093",
+ "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
"collector.routing": [
{
- "fromDomain": 1,
+ "fromDomain": "FAULT",
"toTopic": "test-topic-1"
},
{
- "fromDomain": 2,
+ "fromDomain": "HEARTBEAT",
"toTopic": "test-topic-2"
}
]
-}"""
+ }"""
val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
val params = MessageParameters(
- commonEventHeader = commonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVMEAS),
messageType = VALID,
amount = numMessages
)
val timeout = Duration.ofSeconds(30)
val params = MessageParameters(
- commonEventHeader = commonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVMEAS),
messageType = VALID,
amount = numMessages
)
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.Flux
import java.time.Duration
it("should handle multiple HV RAN events") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HVRANMEAS)
+ vesWireFrameMessage(HVMEAS),
+ vesWireFrameMessage(HVMEAS)
)
assertThat(messages)
it("should not handle messages received from client after end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
- val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
+ val anotherValidMessage = vesWireFrameMessage(HVMEAS)
val endOfTransmissionMessage = endOfTransmissionWireMessage()
val handledEvents = sut.handleConnection(sink,
describe("Memory management") {
it("should release memory for each handled and dropped message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
- val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
+ val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
- sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
+ sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
assertThat(handledEvents).hasSize(1)
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
.isEqualTo(expectedRefCnt)
- assertThat(msgWithInvalidDomain.refCnt())
- .describedAs("message with invalid domain should be released")
- .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
.isEqualTo(expectedRefCnt)
it("should release memory for end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val endOfTransmissionMessage = endOfTransmissionWireMessage()
val expectedRefCnt = 0
it("should release memory for each message with invalid payload") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
val expectedRefCnt = 0
it("should release memory for each message with garbage frame") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
it("should direct message to a topic by means of routing configuration") {
val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HEARTBEAT),
- vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+ vesWireFrameMessage(HVMEAS),
+ vesWireFrameMessage(HEARTBEAT),
+ vesWireFrameMessage(MEASUREMENT))
assertThat(messages).describedAs("number of routed messages").hasSize(3)
assertThat(messages[0].topic).describedAs("first message topic")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(messages[1].topic).describedAs("second message topic")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
it("should drop message if route was not found") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.OTHER, "first"),
- vesWireFrameMessage(Domain.HVRANMEAS, "second"),
- vesWireFrameMessage(Domain.HEARTBEAT, "third"))
+ vesWireFrameMessage(OTHER, "first"),
+ vesWireFrameMessage(HVMEAS, "second"),
+ vesWireFrameMessage(HEARTBEAT, "third"))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).isEmpty()
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
assertThat(message.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
.isEqualTo(0)
}
it("should change domain routing") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
}
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
}
}.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
}.then().block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
println("config changed")
}
}
- .map { vesWireFrameMessage(Domain.HVRANMEAS) }
+ .map { vesWireFrameMessage(HVMEAS) }
sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS, "first"),
- vesMessageWithTooBigPayload(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HVRANMEAS))
+ vesWireFrameMessage(HVMEAS, "first"),
+ vesMessageWithTooBigPayload(HVMEAS),
+ vesWireFrameMessage(HVMEAS))
assertThat(handledMessages).hasSize(1)
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
-const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val HVMEAS_TOPIC = "ves_hvRanMeas"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
-const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas"
+const val ALTERNATE_HVMEAS_TOPIC = "ves_alternateHvRanMeas"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
}.build()
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
defineRoute {
- fromDomain(Domain.HEARTBEAT)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HEARTBEAT.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
defineRoute {
- fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+ fromDomain(MEASUREMENT.name)
toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
withFixedPartitioning()
}
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(ALTERNATE_HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(ALTERNATE_HVMEAS_TOPIC)
withFixedPartitioning()
}
}.build()
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass
import java.io.InputStream
import javax.json.Json
parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
- private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean {
+ private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, expected: List<VesEventOuterClass.VesEvent>): Boolean {
val consumedHeaders = actual.map { it.commonEventHeader }
val generatedHeaders = expected.map { it.commonEventHeader }
return generatedHeaders == consumedHeaders
}
- private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> =
+ private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
messageGenerator.createMessageFlux(parameters)
.map(PayloadWireFrameMessage::payload)
.map(ByteData::unsafeAsArray)
- .map(VesEventV5.VesEvent::parseFrom)
+ .map(VesEventOuterClass.VesEvent::parseFrom)
.collectList()
.asIo()
private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
- consumedMessages.map(VesEventV5.VesEvent::parseFrom)
+ consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
}
import org.jetbrains.spek.api.dsl.it
import org.mockito.ArgumentMatchers.anySet
import org.mockito.Mockito
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import java.util.concurrent.ConcurrentLinkedQueue
/**
return VesEvent.newBuilder()
.setCommonEventHeader(CommonEventHeader.newBuilder()
.setEventId(eventId))
- .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
.build()
}
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import arrow.core.Either
-import arrow.core.Left
-import arrow.core.None
import arrow.core.Right
-import arrow.core.Some
-import arrow.effects.IO
-import javax.json.stream.JsonParsingException
import com.google.protobuf.ByteString
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.never
-import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.mockito.ArgumentMatchers.anyList
-import org.mockito.ArgumentMatchers.anySet
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
-import java.util.concurrent.ConcurrentLinkedQueue
-import javax.json.Json
-import javax.json.JsonArray
-import javax.json.JsonValue
+import javax.json.stream.JsonParsingException
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
return VesEvent.newBuilder()
.setCommonEventHeader(CommonEventHeader.newBuilder()
.setEventId(eventId))
- .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
.build()
}
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
<inputDirectories>
- <include>${project.basedir}/src/main/proto</include>
+ <include>${project.basedir}/src/main/proto/event</include>
</inputDirectories>
<outputTargets>
<outputTarget>
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+enum class VesEventDomain {
+ FAULT,
+ HEARTBEAT,
+ MEASUREMENT,
+ MOBILE_FLOW,
+ OTHER,
+ PNFREGISTRATION,
+ SIP_SIGNALING,
+ STATE_CHANGE,
+ SYSLOG,
+ THRESHOLD_CROSSING_ALERT,
+ VOICE_QUALITY,
+ HVMEAS
+}
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass
val headerRequiredFieldDescriptors = listOf(
"version",
- "eventName",
"domain",
- "eventId",
- "sourceName",
- "reportingEntityName",
+ "sequence",
"priority",
- "startEpochMicrosec",
+ "eventId",
+ "eventName",
"lastEpochMicrosec",
- "sequence")
- .map { fieldName -> VesEventV5.VesEvent.CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
\ No newline at end of file
+ "startEpochMicrosec",
+ "reportingEntityName",
+ "sourceName",
+ "vesEventListenerVersion")
+ .map { fieldName -> VesEventOuterClass.CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-syntax = "proto3";
-package org.onap.ves;
-
-// Definition for RTPM
-
-message HVRanMeasFields {
- message HVRanMeasPayload {
- message PMObject {
- message HVRanMeas {
- uint32 measurement_id = 1;
- repeated uint32 counter_subid = 2;
- repeated sint64 counter_value = 3;
- repeated uint32 missing_counter_subid = 4;
- bool suspectFlagIncomplete = 5; // (some is data missing due to internal error)
- bool suspectFlagOutOfSync = 6; // (source time not aligned)
- }
-
- string uri = 1; // monitored object URI
- repeated HVRanMeas hvRanMeas = 2; // performance counters grouped by measurement types
- }
- repeated PMObject pmObject = 1;
- }
-
- message AdditionalField {
- string name = 1;
- string value = 2;
- }
-
- string hvRanMeasFieldsVersion = 1; // version of HVRanMeasFields message
- uint32 period_ms = 2; // period configured for reporting the data in milliseconds
- string timezone = 3; // timezone of Network Function sending the data
- string pmDictionaryVsn = 4; // vendor name + schema version E.g. NOKIA_LN7.0, uniquely identify the relevant PM dictionary
- HVRanMeasPayload hvRanMeasPayload = 5; // objects being monitored
- repeated AdditionalField additionalFields = 6; // array of name-value pairs if needed
-}
\ No newline at end of file
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-syntax = "proto3";
-package org.onap.ves;
-
-message VesEvent {
-
- // VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
- // Source: https://git.opnfv.org/ves/tree/tests/docs/ves_data_model.json
- // 2017-05-13 Align with VES 5.0 schema.
- // blob: ca948ff67e8a2de4e2a47cffc4d4d2893170ab76
-
- message CommonEventHeader {
- string version = 1; // required, "version of the event header"
- enum Domain {
- DOMAIN_UNDEFINED = 0;
- FAULT = 1;
- HEARTBEAT = 2;
- MEASUREMENTS_FOR_VF_SCALING = 3;
- MOBILE_FLOW = 4;
- SIP_SIGNALING = 5;
- STATE_CHANGE = 6;
- SYSLOG = 7;
- THRESHOLD_CROSSING_ALERT = 8;
- VOICE_QUALITY = 9;
- OTHER = 10;
- HVRANMEAS = 11;
- }
- Domain domain = 2; // required, "the eventing domain associated with the event" [map to string]
-
- uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
-
- enum Priority {
- PRIORITY_UNDEFINED = 0;
- HIGH = 1;
- MEDIUM = 2;
- NORMAL = 3;
- LOW = 4;
- }
- Priority priority = 4; // required, "processing priority"
-
- string eventId = 5; // required, "event key that is unique to the event source"
- string eventName = 6; // required, "unique event name"
- string eventType = 7; // "for example - applicationVnf, guestOS, hostOS, platform"
-
- uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
- uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
-
- string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
- string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
-
- string reportingEntityId = 12; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
- bytes reportingEntityName = 13; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName"
- bytes sourceId = 14; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
- string sourceName = 15; // required, "name of the entity experiencing the event issue"
-
- reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
- reserved 100;
- }
-
- CommonEventHeader commonEventHeader = 1;
-
- oneof eventFields // required, payload, each high-volume domain has its specific GPB schema
- {
- bytes hvRanMeasFields = 2; // if domain==HVRANMEAS, GPB schema: HVRanMeasFields.proto
- }
-}
-
-message VesEventList {
- repeated VesEvent vesEvent = 1;
-}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+
+message VesEvent {
+ CommonEventHeader commonEventHeader = 1; // required
+
+ oneof eventFields // required, payload
+ {
+ // each new high-volume domain can add an entry for its own GPB message
+ // the field can be opaque (bytes) to allow decoding the payload in a separate step
+ bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields
+ }
+}
+
+// VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
+// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain
+
+message CommonEventHeader {
+ string version = 1; // required, "version of the gpb common event header"
+ string domain = 2; // required, "the eventing domain associated with the event", allowed values:
+ // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
+ // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
+
+ uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
+
+ enum Priority {
+ PRIORITY_NOT_PROVIDED = 0;
+ HIGH = 1;
+ MEDIUM = 2;
+ NORMAL = 3;
+ LOW = 4;
+ }
+ Priority priority = 4; // required, "processing priority"
+
+ string eventId = 5; // required, "event key that is unique to the event source"
+ string eventName = 6; // required, "unique event name"
+ string eventType = 7; // "for example - guest05, platform"
+
+ uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+ uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+
+ string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
+ string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
+ string nfVendorName = 12; // " Vendor Name providing the nf "
+
+ bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
+ string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
+ bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
+ string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry"
+ string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device"
+ string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
+
+ reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
+ reserved 100;
+}
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * dcaegen2-collectors-veshv\r
+ * ================================================================================\r
+ * Copyright (C) 2018 NOKIA\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+syntax = "proto3";\r
+package org.onap.ves;\r
+import "MeasDataCollection.proto"; // for 3GPP PM format\r
+\r
+message HVMeasFields\r
+{\r
+ string hvMeasFieldsVersion = 1;\r
+ measDataCollection.MeasDataCollection measDataCollection = 2;\r
+ // From 3GPP TS 28.550\r
+ // Informative: mapping between similar header fields (format may be different)\r
+ // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader\r
+ // senderName sourceName\r
+ // senderType nfNamingCode + nfcNamingCode\r
+ // vendorName nfVendorName\r
+ // collectionBeginTime startEpochMicrosec\r
+ // timestamp lastEpochMicrosec\r
+ repeated HashMap eventAddlFlds = 3; // optional per-event data\r
+}\r
+\r
+message HashMap\r
+{\r
+ string name = 1;\r
+ string value = 2;\r
+}
\ No newline at end of file
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * dcaegen2-collectors-veshv\r
+ * ================================================================================\r
+ * Copyright (C) 2018 NOKIA\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+syntax = "proto3";\r
+package measDataCollection;\r
+\r
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).\r
+// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.\r
+// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.\r
+// Differences/additions to 3GPP TS 28.550 are marked with "%%".\r
+\r
+message MeasDataCollection // top-level message \r
+{\r
+ MeasHeader measHeader = 1;\r
+ repeated MeasData measData = 2; // %%: use a single instance for RTPM\r
+ MeasFooter measFooter = 3;\r
+}\r
+\r
+message MeasHeader\r
+{\r
+ string streamFormatVersion = 1;\r
+ string senderName = 2;\r
+ string senderType = 3;\r
+ string vendorName = 4;\r
+ string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)\r
+}\r
+\r
+message MeasData\r
+{\r
+ string measuredEntityId = 1; // DN as per 3GPP TS 32.300\r
+ string measuredEntityUserName = 2; // network function User Name\r
+ string measuredEntitySoftwareVersion = 3;\r
+ uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event)\r
+ repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432\r
+ repeated MeasInfo measInfo = 6; \r
+}\r
+\r
+\r
+message MeasInfo\r
+{\r
+ oneof MeasInfoId { // measurement group identifier\r
+ uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)\r
+ string measInfoId = 2; // identifier as string (more generic)\r
+ }\r
+\r
+ oneof MeasTypes { // measurement identifiers associated with the measurement results\r
+ IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)\r
+ SMeasTypes measTypes = 4; // identifiers as strings (more generic)\r
+ }\r
+ // Needed only because GPB does not support repeated fields directly inside 'oneof'\r
+ message IMeasTypes { repeated uint32 iMeasType = 1; }\r
+ message SMeasTypes { repeated string measType = 1; }\r
+\r
+ string jobIdList = 5;\r
+ repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups\r
+}\r
+\r
+message MeasValue\r
+{\r
+ oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432\r
+ string measObjInstId = 1; // LDN itself\r
+ uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList\r
+ }\r
+ repeated MeasResult measResults = 3;\r
+ bool suspectFlag = 4;\r
+ repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data\r
+}\r
+\r
+message MeasResult\r
+{\r
+ uint32 p = 1; // Optional index in the MeasTypes array\r
+ oneof xValue {\r
+ sint64 iValue = 2;\r
+ double rValue = 3;\r
+ bool isNull = 4;\r
+ }\r
+}\r
+\r
+message MeasFooter\r
+{\r
+ string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"\r
+}\r
+\r
+message nameValue // %%: vendor-defined name-value pair\r
+{\r
+ string name = 1;\r
+ string value = 2;\r
+}
\ No newline at end of file
import arrow.core.Either
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
import io.netty.buffer.PooledByteBufAllocator
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+
import java.util.UUID.randomUUID
writeByte(0x01) // content type = GPB
}
-fun vesWireFrameMessage(domain: Domain = Domain.OTHER,
+fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
id: String = randomUUID().toString()): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
writeByte(0x01) // content type = GPB
}
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.DOMAIN_UNDEFINED): ByteBuf =
+fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
import com.google.protobuf.ByteString
import com.google.protobuf.MessageLite
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.ves.VesEventOuterClass
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
import java.util.UUID.randomUUID
-fun vesEvent(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS,
+fun vesEvent(domain: VesEventDomain = HVMEAS,
id: String = randomUUID().toString(),
hvRanMeasFields: ByteString = ByteString.EMPTY
-): VesEventV5.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields)
+): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields)
-fun vesEvent(commonEventHeader: VesEventV5.VesEvent.CommonEventHeader,
- hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventV5.VesEvent =
- VesEventV5.VesEvent.newBuilder()
+fun vesEvent(commonEventHeader: CommonEventHeader,
+ hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
+ VesEventOuterClass.VesEvent.newBuilder()
.setCommonEventHeader(commonEventHeader)
- .setHvRanMeasFields(hvRanMeasFields)
+ .setHvMeasFields(hvRanMeasFields)
.build()
-fun commonHeader(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS,
+fun commonHeader(domain: VesEventDomain = HVMEAS,
id: String = randomUUID().toString(),
- priority: VesEventV5.VesEvent.CommonEventHeader.Priority = VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL): VesEventV5.VesEvent.CommonEventHeader =
- VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+ priority: Priority = Priority.NORMAL): CommonEventHeader =
+ CommonEventHeader.newBuilder()
.setVersion("sample-version")
- .setDomain(domain)
+ .setDomain(domain.name)
.setSequence(1)
.setPriority(priority)
.setEventId(id)
.setLastEpochMicrosec(120034455)
.setNfNamingCode("sample-nf-naming-code")
.setNfcNamingCode("sample-nfc-naming-code")
- .setReportingEntityId("sample-reporting-entity-id")
- .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+ .setNfVendorName("vendor-name")
+ .setReportingEntityId(ByteString.copyFromUtf8("sample-reporting-entity-id"))
+ .setReportingEntityName("sample-reporting-entity-name")
.setSourceId(ByteString.copyFromUtf8("sample-source-id"))
.setSourceName("sample-source-name")
+ .setTimeZoneOffset("+1")
+ .setVesEventListenerVersion("another-version")
.build()
-fun vesEventBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
+fun vesEventBytes(commonHeader: CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
vesEvent(commonHeader, byteString).toByteData()
fun MessageLite.toByteData(): ByteData = ByteData(toByteArray())
\ No newline at end of file
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
import arrow.core.Option
import com.google.protobuf.util.JsonFormat
import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
-import org.onap.ves.VesEventV5.VesEvent.*
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import javax.json.JsonObject
/**
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventOuterClass.VesEvent
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.charset.Charset
PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
}
- private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: HVRanMeasPayload): ByteArray {
- return vesEvent(commonEventHeader, hvRanMeasPayload.toByteString())
- }
-
private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray {
return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray()
}
private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
VesEvent.newBuilder()
.setCommonEventHeader(commonEventHeader)
- .setHvRanMeasFields(payload)
+ .setHvMeasFields(payload)
.build()
private fun oversizedPayload() =
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas
import java.util.*
+import kotlin.streams.asSequence
internal class PayloadGenerator {
fun generateRawPayload(size: Int): ByteString =
ByteString.copyFrom(ByteArray(size))
- fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload {
- val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject)
- return HVRanMeasPayload.newBuilder()
- .addPmObject(pmObject)
- .build()
- }
+ fun generatePayload(numOfCountMeasurements: Long = 2): ByteString =
+ ByteString.copyFrom(
+ randomGenerator.ints(numOfCountMeasurements, 0, 256)
+ .asSequence()
+ .toString()
+ .toByteArray()
+ )
- private fun generatePmObject(numOfCountPerMeas: Long, numOfMeasPerObject: Int): PMObject {
- val hvRanMeasList = MutableList(numOfMeasPerObject) { generateHvRanMeas(numOfCountPerMeas) }
- val finalUriName = URI_BASE_NAME + randomGenerator.nextInt(UPPER_URI_NUMBER_BOUND)
- return HVRanMeasPayload.PMObject.newBuilder()
- .setUri(finalUriName)
- .addAllHvRanMeas(hvRanMeasList.asIterable())
- .build()
- }
-
- private fun generateHvRanMeas(numOfCountPerMeas: Long): HVRanMeas {
- return HVRanMeasPayload.PMObject.HVRanMeas.newBuilder()
- .setMeasurementId(randomGenerator.nextInt())
- .addAllCounterSubid(Iterable { randomGenerator.ints(numOfCountPerMeas).iterator() })
- .addAllCounterValue(Iterable { randomGenerator.longs(numOfCountPerMeas).iterator() })
- .setSuspectFlagIncomplete(false)
- .setSuspectFlagOutOfSync(false)
- .build()
- }
-
- companion object {
- private const val URI_BASE_NAME = "sample/uri"
- private const val UPPER_URI_NUMBER_BOUND = 10_000
- }
}
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import java.io.ByteArrayInputStream
import javax.json.Json
import kotlin.test.fail
given("valid header in JSON format") {
val commonEventHeader = commonHeader(
- domain = VesEventV5.VesEvent.CommonEventHeader.Domain.STATE_CHANGE,
+ domain = STATE_CHANGE,
id = "sample-event-id")
val json = JsonFormat.printer().print(commonEventHeader).byteInputStream()
}
})
-fun assertFailed(result: Option<VesEventV5.VesEvent.CommonEventHeader>) =
+fun assertFailed(result: Option<CommonEventHeader>) =
result.fold({}, { fail() })
fun jsonObject(json: ByteArrayInputStream) = Json.createReader(json).readObject()
\ No newline at end of file
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.test.test
/**
val limit = 1000L
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.VALID
)))
.take(limit)
it("should create message flux of specified size") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.VALID,
5
)))
.assertNext {
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.verifyComplete()
}
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.TOO_BIG_PAYLOAD,
1
)))
.assertNext {
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
}
.verifyComplete()
}
it("should create flux of messages with invalid payload") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.INVALID_GPB_DATA,
1
)))
it("should create flux of messages with invalid version") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.INVALID_WIRE_FRAME,
1
)))
.assertNext {
assertThat(it.isValid()).isFalse()
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR)
}
.verifyComplete()
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.verifyComplete()
}
it("should create concatenated flux of messages") {
val singleFluxSize = 5L
val messageParameters = listOf(
- MessageParameters(commonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize),
+ MessageParameters(commonHeader(HVMEAS), MessageType.VALID, singleFluxSize),
MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
)
.test()
.assertNext {
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name)
}
.expectNextCount(singleFluxSize - 1)
.verifyComplete()
}
})
-fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
- return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-}
+fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
+ VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+
+
+fun extractHvRanMeasFields(bytes: ByteData): ByteString =
+ VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields
-fun extractHvRanMeasFields(bytes: ByteData): ByteString {
- return VesEvent.parseFrom(bytes.unsafeAsArray()).hvRanMeasFields
-}
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
-private const val DEFAULT_MEASUREMENTS_NUMBER = 2
-private const val DEFAULT_COUNTERS_NUMBER = 2
-
-private val uriRegex = """sample/uri(\d+)""".toRegex()
-
object PayloadGeneratorTest : Spek({
given("payload factory object") {
val payloadGenerator = PayloadGenerator()
- on("two generated payloads") {
- val generatedPayload0 = payloadGenerator.generatePayload()
- val generatedPayload1 = payloadGenerator.generatePayload()
- it("URIs should have different names") {
- val matchResult0 = uriRegex.find(generatedPayload0.getPmObject(0).uri)!!.value
- val matchResult1 = uriRegex.find(generatedPayload1.getPmObject(0).uri)!!.value
- assertThat(matchResult0 != matchResult1).isTrue()
- }
- }
+ on("raw payload generation") {
+ val size = 100
+ val generatedPayload = payloadGenerator.generateRawPayload(size)
- on("call with default parameters") {
- val generatedPayload = payloadGenerator.generatePayload()
- it("should contain default numbers of measurements") {
- assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(DEFAULT_MEASUREMENTS_NUMBER)
- }
- it("should contain default numbers of counters in measurement") {
- assertThat(generatedPayload.getPmObject(0).getHvRanMeas(0).counterSubidCount).isEqualTo(DEFAULT_COUNTERS_NUMBER)
+ it("should generate sequence of zeros") {
+ assertThat(generatedPayload.size()).isEqualTo(size)
+ assertThat(generatedPayload.toByteArray()).isEqualTo(ByteArray(size))
}
}
- on("call with specified parameters") {
- val numOfCountPerMeas: Long = 5
- val numOfMeasPerObject = 10
- val generatedPayload = payloadGenerator.generatePayload(numOfCountPerMeas, numOfMeasPerObject)
- it("should contain specified number of measurements") {
- assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(numOfMeasPerObject)
- }
- it("measurement should contain specified number of counters") {
- assertThat(generatedPayload.getPmObject(0).hvRanMeasList
- .filter { numOfCountPerMeas.toInt() == it.counterSubidCount }
- .size)
- .isEqualTo(numOfMeasPerObject)
+ on("two generated payloads") {
+ val generatedPayload0 = payloadGenerator.generatePayload()
+ val generatedPayload1 = payloadGenerator.generatePayload()
+ it("should be different") {
+ assertThat(generatedPayload0 != generatedPayload1).isTrue()
}
-
}
}
})
import javax.json.Json
-private const val validMessageParameters = "[\n" +
- " {\n" +
- " \"commonEventHeader\": {\n" +
- " \"version\": \"sample-version\",\n" +
- " \"domain\": \"HVRANMEAS\",\n" +
- " \"sequence\": 1,\n" +
- " \"priority\": 1,\n" +
- " \"eventId\": \"sample-event-id\",\n" +
- " \"eventName\": \"sample-event-name\",\n" +
- " \"eventType\": \"sample-event-type\",\n" +
- " \"startEpochMicrosec\": 120034455,\n" +
- " \"lastEpochMicrosec\": 120034455,\n" +
- " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
- " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
- " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
- " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
- " \"sourceId\": \"sample-source-id\",\n" +
- " \"sourceName\": \"sample-source-name\"\n" +
- " },\n" +
- " \"messageType\": \"VALID\",\n" +
- " \"messagesAmount\": 25000\n" +
- " },\n" +
- " {\n" +
- " \"commonEventHeader\": {\n" +
- " \"version\": \"sample-version\",\n" +
- " \"domain\": \"HVRANMEAS\",\n" +
- " \"sequence\": 1,\n" +
- " \"priority\": 1,\n" +
- " \"eventId\": \"sample-event-id\",\n" +
- " \"eventName\": \"sample-event-name\",\n" +
- " \"eventType\": \"sample-event-type\",\n" +
- " \"startEpochMicrosec\": 120034455,\n" +
- " \"lastEpochMicrosec\": 120034455,\n" +
- " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
- " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
- " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
- " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
- " \"sourceId\": \"sample-source-id\",\n" +
- " \"sourceName\": \"sample-source-name\"\n" +
- " },\n" +
- " \"messageType\": \"TOO_BIG_PAYLOAD\",\n" +
- " \"messagesAmount\": 100\n" +
- " }\n" +
- "]"
+private const val validMessageParameters =
+"""[
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "HVMEAS",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "VALID",
+ "messagesAmount": 25000
+ },
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "HVMEAS",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "TOO_BIG_PAYLOAD",
+ "messagesAmount": 100
+ }
+ ]
+"""
-private const val invalidMessageParameters = "[\n" +
- " {\n" +
- " \"commonEventHeader\": {\n" +
- " \"version\": \"sample-version\",\n" +
- " \"domain\": \"HVRANMEAS\",\n" +
- " \"sequence\": 1,\n" +
- " \"priority\": 1,\n" +
- " \"eventId\": \"sample-event-id\",\n" +
- " \"eventName\": \"sample-event-name\",\n" +
- " \"eventType\": \"sample-event-type\",\n" +
- " \"startEpochMicrosec\": 120034455,\n" +
- " \"lastEpochMicrosec\": 120034455,\n" +
- " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
- " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
- " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
- " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
- " \"sourceId\": \"sample-source-id\",\n" +
- " \"sourceName\": \"sample-source-name\"\n" +
- " },\n" +
- " \"messagesAmount\": 3\n" +
- " }\n" +
- "]"
+private const val invalidMessageParameters =
+"""
+ [
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "HVMEAS",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messagesAmount": 3
+ }
+ ]
+"""
fun validMessagesParametesJson() = Json
.createReader(validMessageParameters.reader())