* It uses a PROTO representation of the VES Common Header
* The PROTO files tend to use most encoding effective types defined by GPB to cover Common Header fields.
* It makes routing decisions based mostly on the content of the "Domain" parameter
- * It allows to embed Payload of different types (by default hvMeas domain is included)
+ * It allows to embed Payload of different types (by default PERF3GPP domain is included)
* VES-HV publishes events on DMaaP-Kafka bus, using native Kafka Interfaces
* Analytics applications impacts
* An analytics application operating on high-volume data needs to be prepared to read directly from Kafka
VES-HV was designed to allow for extendability - by adding new domain-specific PROTO files.
-The PROTO file, which contains the VES CommonHeader, comes with a binary-type Payload parameter, where domain-specific data shall be placed. Domain-specific data are encoded as well with GPB, and they do require a domain-specific PROTO file to decode the data. This domain-specific PROTO needs to be shared with analytics applications - VES-HV is not analyzing domain-specific data. In order to support the RT-PM use-case, VES-HV includes a "hvMeas" domain PROTO file, as within this domain, the high volume data is expected to be reported to VES-HV collector.
+The PROTO file, which contains the VES CommonHeader, comes with a binary-type Payload parameter, where domain-specific data shall be placed. Domain-specific data are encoded as well with GPB, and they do require a domain-specific PROTO file to decode the data. This domain-specific PROTO needs to be shared with analytics applications - VES-HV is not analyzing domain-specific data. In order to support the RT-PM use-case, VES-HV includes a "PERF3GPP" domain PROTO file, as within this domain, the high volume data is expected to be reported to VES-HV collector.
Still, there are no limitations to define additional domains, based on existing VES domains (like Fault, Heartbeat) or completely new domains. New domains can be added "when needed".
In case of new domains, it is necessary to extend the Common Header PROTO "Domain" enumeration with new values covering this new domain(s).
- "6063:6063/tcp"
command: ["--listen-port", "6063",
"--kafka-bootstrap-servers", "kafka:9092",
- "--kafka-topics", "ves_hvRanMeas"]
+ "--kafka-topics", "HV_VES_PERF3GPP"]
depends_on:
- kafka
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
import org.onap.dcae.collectors.veshv.model.RoutedMessage
val config = routing {
defineRoute {
- fromDomain(HVMEAS.name)
+ fromDomain(PERF3GPP.name)
toTopic("ves_rtpm")
withFixedPartitioning(2)
}
val cut = Router(config)
on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(HVMEAS), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
val params = MessageParameters(
- commonEventHeader = commonHeader(HVMEAS),
+ commonEventHeader = commonHeader(PERF3GPP),
messageType = VALID,
amount = numMessages
)
val timeout = Duration.ofSeconds(30)
val params = MessageParameters(
- commonEventHeader = commonHeader(HVMEAS),
+ commonEventHeader = commonHeader(PERF3GPP),
messageType = VALID,
amount = numMessages
)
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
it("should handle multiple HV RAN events") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(HVMEAS),
- vesWireFrameMessage(HVMEAS)
+ vesWireFrameMessage(PERF3GPP),
+ vesWireFrameMessage(PERF3GPP)
)
assertThat(messages)
describe("Memory management") {
it("should release memory for each handled and dropped message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(HVMEAS)
+ val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
+ val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
it("should release memory for each message with invalid payload") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(HVMEAS)
+ val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
val expectedRefCnt = 0
it("should release memory for each message with garbage frame") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(HVMEAS)
+ val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
it("should direct message to a topic by means of routing configuration") {
val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(HVMEAS),
+ vesWireFrameMessage(PERF3GPP),
vesWireFrameMessage(HEARTBEAT),
vesWireFrameMessage(MEASUREMENT))
assertThat(messages).describedAs("number of routed messages").hasSize(3)
assertThat(messages[0].topic).describedAs("first message topic")
- .isEqualTo(HVMEAS_TOPIC)
+ .isEqualTo(PERF3GPP_TOPIC)
assertThat(messages[1].topic).describedAs("second message topic")
- .isEqualTo(HVMEAS_TOPIC)
+ .isEqualTo(PERF3GPP_TOPIC)
assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
vesWireFrameMessage(OTHER, "first"),
- vesWireFrameMessage(HVMEAS, "second"),
+ vesWireFrameMessage(PERF3GPP, "second"),
vesWireFrameMessage(HEARTBEAT, "third"))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
- val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
assertThat(message.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(HVMEAS_TOPIC)
+ .isEqualTo(PERF3GPP_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
.isEqualTo(0)
}
it("should change domain routing") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
- .isEqualTo(HVMEAS_TOPIC)
+ .isEqualTo(PERF3GPP_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
+ .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
}
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
}
}.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
+ sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
}.then().block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
println("config changed")
}
}
- .map { vesWireFrameMessage(HVMEAS) }
+ .map { vesWireFrameMessage(PERF3GPP) }
sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
- vesWireFrameMessage(HVMEAS, "first"),
- vesMessageWithTooBigPayload(HVMEAS),
- vesWireFrameMessage(HVMEAS))
+ vesWireFrameMessage(PERF3GPP, "first"),
+ vesMessageWithTooBigPayload(PERF3GPP),
+ vesWireFrameMessage(PERF3GPP))
assertThat(handledMessages).hasSize(1)
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import reactor.retry.RetryExhaustedException
-const val HVMEAS_TOPIC = "ves_hvRanMeas"
-const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
-const val ALTERNATE_HVMEAS_TOPIC = "ves_alternateHvRanMeas"
+const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
+const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
+const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(HVMEAS.name)
- toTopic(HVMEAS_TOPIC)
+ fromDomain(PERF3GPP.name)
+ toTopic(PERF3GPP_TOPIC)
withFixedPartitioning()
}
}.build()
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(HVMEAS.name)
- toTopic(HVMEAS_TOPIC)
+ fromDomain(PERF3GPP.name)
+ toTopic(PERF3GPP_TOPIC)
withFixedPartitioning()
}
defineRoute {
fromDomain(HEARTBEAT.name)
- toTopic(HVMEAS_TOPIC)
+ toTopic(PERF3GPP_TOPIC)
withFixedPartitioning()
}
defineRoute {
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(HVMEAS.name)
- toTopic(ALTERNATE_HVMEAS_TOPIC)
+ fromDomain(PERF3GPP.name)
+ toTopic(ALTERNATE_PERF3GPP_TOPIC)
withFixedPartitioning()
}
}.build()
fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
describe("listenToTopics") {
- val topics = setOf("hvMeas", "faults")
+ val topics = setOf("perf3gpp", "faults")
it("should fail when topic list is empty") {
val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
}
it("should fail when topic list contains empty strings") {
- val result = cut.listenToTopics(setOf("hvMeas", " ", "faults")).attempt().unsafeRunSync()
+ val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
assertThat(result.isLeft()).isTrue()
}
}
it("should subscribe to given topics when called with comma separated list") {
- cut.listenToTopics("hvMeas,faults").unsafeRunSync()
+ cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
verify(consumerFactory).createConsumerForTopics(topics)
}
.thenReturn(IO.raiseError(error))
// when
- val result = cut.listenToTopics("hvMeas").attempt().unsafeRunSync()
+ val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
// then
assertThat(result).isEqualTo(Left(error))
describe("when topics are initialized") {
beforeEachTest {
- cut.listenToTopics("hvMeas").unsafeRunSync()
+ cut.listenToTopics("perf3gpp").unsafeRunSync()
}
it("should return some state when it has been set") {
describe("when topics are initialized") {
beforeEachTest {
- cut.listenToTopics("hvMeas").unsafeRunSync()
+ cut.listenToTopics("perf3gpp").unsafeRunSync()
}
it("should reset the state") {
it("should delegate to MessageStreamValidation") {
// given
- cut.listenToTopics("hvMeas").unsafeRunSync()
+ cut.listenToTopics("perf3gpp").unsafeRunSync()
whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
// when
SYSLOG,
THRESHOLD_CROSSING_ALERT,
VOICE_QUALITY,
- HVMEAS
+ PERF3GPP
}
syntax = "proto3";
package org.onap.ves;
-message VesEvent {
+message VesEvent // top-level message
+{
CommonEventHeader commonEventHeader=1; // required
bytes eventFields=2; // required, payload
- // this field contains a domain-specific GPB message
- // the field being opaque (bytes), the decoding of the payload occurs in a separate step
- // the name of the GPB message for domain XYZ is XYZFields
- // e.g. for domain==HVMEAS, the GPB message is HVMEASFields
+ // this field contains a domain-specific GPB message
+ // the field being opaque (bytes), the decoding of the payload occurs in a separate step
+ // the name of the GPB message for domain XYZ is XYZFields
+ // e.g. for domain==PERF3GPP, the GPB message is Perf3GPPFields
}
// VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
-// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain.
+// Aligned with VES 7.0.1 schema, and extending to Performance Domain.
message CommonEventHeader
{
string version = 1; // required, "version of the gpb common event header"
string domain = 2; // required, "the eventing domain associated with the event", allowed values:
- // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
- // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
+ // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
+ // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, PERF3GPP
uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
-/*\r
- * ============LICENSE_START=======================================================\r
- * dcaegen2-collectors-veshv\r
- * ================================================================================\r
- * Copyright (C) 2018 NOKIA\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============LICENSE_END=========================================================\r
- */\r
-syntax = "proto3";\r
-package org.onap.ves;\r
-\r
-// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09).\r
-// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.\r
-// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version.\r
-// Differences/additions to 3GPP TS 28.550 are marked with "%%".\r
-\r
-message MeasDataCollection // top-level message\r
-{\r
- // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod).\r
- string formatVersion = 1;\r
- string senderName = 2;\r
- string senderType = 3;\r
- string vendorName = 4;\r
- string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)\r
- uint32 granularityPeriod = 6; // duration in seconds, %% moved from MeasInfo (single reporting period per event)\r
- string measuredEntityUserName = 7; // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622\r
- string measuredEntityDn = 8; // DN as per 3GPP TS 32.300\r
- string measuredEntitySoftwareVersion = 9;\r
- repeated string measObjInstIdList = 10; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432\r
- repeated MeasInfo measInfo = 11;\r
-}\r
-\r
-message MeasInfo\r
-{\r
- oneof MeasInfoId { // measurement group identifier\r
- uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)\r
- string measInfoId = 2; // identifier as string (more generic)\r
- }\r
-\r
- oneof MeasTypes { // measurement identifiers associated with the measurement results\r
- IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)\r
- SMeasTypes measTypes = 4; // identifiers as strings (more generic)\r
- }\r
- // Needed only because GPB does not support repeated fields directly inside 'oneof'\r
- message IMeasTypes { repeated uint32 iMeasType = 1; }\r
- message SMeasTypes { repeated string measType = 1; }\r
-\r
- string jobId = 5;\r
- repeated MeasValue measValues = 6; // performance measurements grouped by measurement object\r
-}\r
-\r
-message MeasValue\r
-{\r
- oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432\r
- string measObjInstId = 1; // LDN itself\r
- uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList\r
- }\r
- repeated MeasResult measResults = 3;\r
- bool suspectFlag = 4;\r
- map<string, string> measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap)\r
-}\r
-\r
-message MeasResult\r
-{\r
- uint32 p = 1; // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes\r
- oneof xValue {\r
- sint64 iValue = 2;\r
- double rValue = 3;\r
- bool isNull = 4;\r
- }\r
-}\r
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09).
+// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.
+// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version.
+// Differences/additions to 3GPP TS 28.550 are marked with "%%".
+
+message MeasDataCollection // top-level message
+{
+ // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod).
+ string formatVersion = 1;
+ uint32 granularityPeriod = 2; // duration in seconds, %% moved from MeasInfo (single reporting period per event)
+ string measuredEntityUserName = 3; // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622
+ string measuredEntityDn = 4; // DN as per 3GPP TS 32.300
+ string measuredEntitySoftwareVersion = 5;
+ repeated string measObjInstIdList = 6; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
+ repeated MeasInfo measInfo = 7;
+}
+
+message MeasInfo
+{
+ oneof MeasInfoId { // measurement group identifier
+ uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
+ string measInfoId = 2; // identifier as string (more generic)
+ }
+
+ oneof MeasTypes { // measurement identifiers associated with the measurement results
+ IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
+ SMeasTypes measTypes = 4; // identifiers as strings (more generic)
+ }
+ // Needed only because GPB does not support repeated fields directly inside 'oneof'
+ message IMeasTypes { repeated uint32 iMeasType = 1; }
+ message SMeasTypes { repeated string measType = 1; }
+
+ string jobId = 5;
+ repeated MeasValue measValues = 6; // performance measurements grouped by measurement object
+}
+
+message MeasValue
+{
+ oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432
+ string measObjInstId = 1; // LDN itself
+ uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
+ }
+ repeated MeasResult measResults = 3;
+ bool suspectFlag = 4;
+ map<string, string> measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap)
+}
+
+message MeasResult
+{
+ uint32 p = 1; // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes
+ oneof xValue {
+ sint64 iValue = 2;
+ double rValue = 3;
+ bool isNull = 4;
+ }
+}
-/*\r
- * ============LICENSE_START=======================================================\r
- * dcaegen2-collectors-veshv\r
- * ================================================================================\r
- * Copyright (C) 2018 NOKIA\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============LICENSE_END=========================================================\r
- */\r
-syntax = "proto3";\r
-package org.onap.ves;\r
-import "MeasDataCollection.proto"; // for 3GPP PM format\r
-\r
-message HVMeasFields\r
-{\r
- string hvMeasFieldsVersion = 1;\r
- MeasDataCollection measDataCollection = 2;\r
- // Based on 3GPP TS 28.550\r
- // Informative: mapping between similar header fields (format may be different)\r
- // 3GPP MeasHeader ONAP/VES CommonEventHeader\r
- // senderName sourceName\r
- // senderType nfNamingCode + nfcNamingCode\r
- // vendorName nfVendorName\r
- // collectionBeginTime startEpochMicrosec\r
- // timestamp lastEpochMicrosec\r
- map<string, string> eventAddlFlds = 3; // optional per-event data (name/value HashMap)\r
-}\r
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+import "MeasDataCollection.proto"; // for 3GPP PM format
+
+message Perf3GPPFields
+{
+ string perf3GPPFieldsVersion = 1;
+ MeasDataCollection measDataCollection = 2;
+ // Based on 3GPP TS 28.550
+ // Logical mapping from 3GPP to ONAP header fields:
+ // 3GPP MeasFileHeader ONAP/VES CommonEventHeader
+ // senderName sourceName
+ // senderType nfNamingCode + nfcNamingCode
+ // vendorName nfVendorName
+ // collectionBeginTime startEpochMicrosec
+ // timestamp lastEpochMicrosec
+ map<string, string> eventAddlFlds = 3; // optional per-event data (name/value HashMap)
+}
cut.notifyMessageReceived(128)
cut.notifyMessageReceived(256)
cut.notifyMessageReceived(256)
- cut.notifyMessageSent("hvranmeas")
+ cut.notifyMessageSent("PERF3GPP")
verifyGauge("messages.processing.count") { gauge ->
assertThat(gauge.value()).isCloseTo(2.0, doublePrecision)
}
on("zero difference") {
cut.notifyMessageReceived(128)
- cut.notifyMessageSent("hvranmeas")
+ cut.notifyMessageSent("PERF3GPP")
verifyGauge("messages.processing.count") { gauge ->
assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
}
on("negative difference") {
cut.notifyMessageReceived(128)
- cut.notifyMessageSent("calltrace")
- cut.notifyMessageSent("hvranmeas")
+ cut.notifyMessageSent("FAULT")
+ cut.notifyMessageSent("PERF3GPP")
verifyGauge("messages.processing.count") { gauge ->
assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
}
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
import java.util.UUID.randomUUID
writeByte(0x01) // version minor
}
-fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf =
+fun vesMessageWithTooBigPayload(domain: VesEventDomain = PERF3GPP): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
val gpb = vesEvent(
domain = domain,
- hvRanMeasFields = ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ eventFields = ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
).toByteString().asReadOnlyByteBuffer()
writeInt(gpb.limit()) // ves event size in bytes
import com.google.protobuf.MessageLite
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.ves.VesEventOuterClass
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
import java.util.UUID.randomUUID
-fun vesEvent(domain: VesEventDomain = HVMEAS,
+fun vesEvent(domain: VesEventDomain = PERF3GPP,
id: String = randomUUID().toString(),
- hvRanMeasFields: ByteString = ByteString.EMPTY
-): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields)
+ eventFields: ByteString = ByteString.EMPTY
+): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), eventFields)
fun vesEvent(commonEventHeader: CommonEventHeader,
- hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
+ eventFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
VesEventOuterClass.VesEvent.newBuilder()
.setCommonEventHeader(commonEventHeader)
- .setEventFields(hvRanMeasFields)
+ .setEventFields(eventFields)
.build()
-fun commonHeader(domain: VesEventDomain = HVMEAS,
+fun commonHeader(domain: VesEventDomain = PERF3GPP,
id: String = randomUUID().toString(),
priority: Priority = Priority.NORMAL): CommonEventHeader =
CommonEventHeader.newBuilder()
WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
}
- private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray {
- return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray()
+ private fun vesEvent(commonEventHeader: CommonEventHeader, eventFields: ByteString): ByteArray {
+ return createVesEvent(commonEventHeader, eventFields).toByteArray()
}
private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
val limit = 1000L
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVMEAS),
+ commonHeader(PERF3GPP),
MessageType.VALID
)))
.take(limit)
it("should create message flux of specified size") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVMEAS),
+ commonHeader(PERF3GPP),
MessageType.VALID,
5
)))
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVMEAS),
+ commonHeader(PERF3GPP),
MessageType.TOO_BIG_PAYLOAD,
1
)))
.assertNext {
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.name)
}
.verifyComplete()
}
it("should create flux of messages with invalid payload") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVMEAS),
+ commonHeader(PERF3GPP),
MessageType.INVALID_GPB_DATA,
1
)))
it("should create flux of messages with invalid version") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVMEAS),
+ commonHeader(PERF3GPP),
MessageType.INVALID_WIRE_FRAME,
1
)))
.assertNext {
assertThat(it.isValid()).isFalse()
assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.name)
assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
}
.verifyComplete()
.assertNext {
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
+ assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.verifyComplete()
it("should create concatenated flux of messages") {
val singleFluxSize = 5L
val messageParameters = listOf(
- MessageParameters(commonHeader(HVMEAS), MessageType.VALID, singleFluxSize),
+ MessageParameters(commonHeader(PERF3GPP), MessageType.VALID, singleFluxSize),
MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
)
.test()
.assertNext {
assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.name)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-fun extractHvRanMeasFields(bytes: ByteData): ByteString =
+fun extractEventFields(bytes: ByteData): ByteString =
VesEvent.parseFrom(bytes.unsafeAsArray()).eventFields
{
"commonEventHeader": {
"version": "sample-version",
- "domain": "HVMEAS",
+ "domain": "PERF3GPP",
"sequence": 1,
"priority": 1,
"eventId": "sample-event-id",
{
"commonEventHeader": {
"version": "sample-version",
- "domain": "HVMEAS",
+ "domain": "PERF3GPP",
"sequence": 1,
"priority": 1,
"eventId": "sample-event-id",
{
"commonEventHeader": {
"version": "sample-version",
- "domain": "HVMEAS",
+ "domain": "PERF3GPP",
"sequence": 1,
"priority": 1,
"eventId": "sample-event-id",