Align with latest HV-VES proto definition 75/66575/6
authorFilip Krzywka <filip.krzywka@nokia.com>
Tue, 11 Sep 2018 12:45:53 +0000 (14:45 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Mon, 17 Sep 2018 11:53:34 +0000 (11:53 +0000)
- excluded measurements proto files from build to keep them
reference-only

Change-Id: I8c6de20eeeb1b9f8cd2ae4e865de368afe55cc91
Issue-ID: DCAEGEN2-775
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
36 files changed:
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
hv-collector-domain/pom.xml
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt [new file with mode: 0644]
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt
hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto [deleted file]
hv-collector-domain/src/main/proto/VesEvent-v5.proto [deleted file]
hv-collector-domain/src/main/proto/event/VesEvent.proto [new file with mode: 0644]
hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto [new file with mode: 0644]
hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto [new file with mode: 0644]
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt
hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt

index 8affa0b..a4a4374 100644 (file)
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
 
 import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
 import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
 internal object MessageValidator {
 
index a778010..1d43588 100644 (file)
@@ -23,7 +23,7 @@ import arrow.core.Try
 import arrow.core.Option
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventOuterClass.VesEvent
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
index d8ea45d..d08ad9e 100644 (file)
@@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.retry.Jitter
@@ -116,7 +115,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
                     for (route in routing) {
                         val routeObj = route.asJsonObject()
                         defineRoute {
-                            fromDomain(forNumber(routeObj.getInt("fromDomain")))
+                            fromDomain(routeObj.getString("fromDomain"))
                             toTopic(routeObj.getString("toTopic"))
                             withFixedPartitioning()
                         }
index b611e9a..a0c2241 100644 (file)
@@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.core.publisher.Flux
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderRecord
index a00a02d..1819195 100644 (file)
@@ -24,7 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderOptions
 
index 03996fd..f5bfcce 100644 (file)
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.model
 
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.impl.MessageValidator
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
index e9cd5f3..a42b982 100644 (file)
@@ -20,8 +20,7 @@
 package org.onap.dcae.collectors.veshv.model
 
 import arrow.core.Option
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
 data class Routing(val routes: List<Route>) {
 
@@ -29,7 +28,7 @@ data class Routing(val routes: List<Route>) {
             Option.fromNullable(routes.find { it.applies(commonHeader) })
 }
 
-data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
 
     fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
 
@@ -63,11 +62,11 @@ class RoutingBuilder {
 
 class RouteBuilder {
 
-    private lateinit var domain: Domain
+    private lateinit var domain: String
     private lateinit var targetTopic: String
     private lateinit var partitioning: (CommonEventHeader) -> Int
 
-    fun fromDomain(domain: Domain) {
+    fun fromDomain(domain: String) {
         this.domain = domain
     }
 
index 213f454..443dfa2 100644 (file)
@@ -25,13 +25,14 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
+
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.getDefaultInstance
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.newBuilder
 
 internal object MessageValidatorTest : Spek({
 
@@ -46,8 +47,7 @@ internal object MessageValidatorTest : Spek({
                 assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
             }
 
-            Domain.values()
-                    .filter { (it != Domain.UNRECOGNIZED && it != Domain.DOMAIN_UNDEFINED) }
+            VesEventDomain.values()
                     .forEach { domain ->
                         it("should accept message with $domain domain") {
                             val header = commonHeader(domain)
@@ -65,26 +65,8 @@ internal object MessageValidatorTest : Spek({
             }
         }
 
-
-        val domainTestCases = mapOf(
-                Domain.DOMAIN_UNDEFINED to false,
-                Domain.FAULT to true
-        )
-
-        domainTestCases.forEach { value, expectedResult ->
-            on("ves hv message including header with domain $value") {
-                val commonEventHeader = commonHeader(value)
-                val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
-
-                it("should resolve validation result") {
-                    assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
-                            .isEqualTo(expectedResult)
-                }
-            }
-        }
-
         val priorityTestCases = mapOf(
-                Priority.PRIORITY_UNDEFINED to false,
+                Priority.PRIORITY_NOT_PROVIDED to false,
                 Priority.HIGH to true
         )
 
index 91fa7c1..c2fe1cc 100644 (file)
@@ -27,11 +27,14 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -42,13 +45,13 @@ object RouterTest : Spek({
         val config = routing {
 
             defineRoute {
-                fromDomain(Domain.HVRANMEAS)
+                fromDomain(HVMEAS.name)
                 toTopic("ves_rtpm")
                 withFixedPartitioning(2)
             }
 
             defineRoute {
-                fromDomain(Domain.SYSLOG)
+                fromDomain(SYSLOG.name)
                 toTopic("ves_trace")
                 withFixedPartitioning()
             }
@@ -56,7 +59,7 @@ object RouterTest : Spek({
         val cut = Router(config)
 
         on("message with existing route (rtpm)") {
-            val message = VesMessage(commonHeader(Domain.HVRANMEAS), ByteData.EMPTY)
+            val message = VesMessage(commonHeader(HVMEAS), ByteData.EMPTY)
             val result = cut.findDestination(message)
 
             it("should have route available") {
@@ -77,7 +80,7 @@ object RouterTest : Spek({
         }
 
         on("message with existing route (trace)") {
-            val message = VesMessage(commonHeader(Domain.SYSLOG), ByteData.EMPTY)
+            val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
             val result = cut.findDestination(message)
 
             it("should have route available") {
@@ -98,7 +101,7 @@ object RouterTest : Spek({
         }
 
         on("message with unknown route") {
-            val message = VesMessage(commonHeader(Domain.HEARTBEAT), ByteData.EMPTY)
+            val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
             val result = cut.findDestination(message)
 
             it("should not have route available") {
index a7d3971..8950a55 100644 (file)
@@ -26,10 +26,10 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import java.nio.charset.Charset
 import kotlin.test.assertTrue
 import kotlin.test.fail
@@ -41,7 +41,7 @@ internal object VesDecoderTest : Spek({
         val cut = VesDecoder()
 
         on("ves hv message bytes") {
-            val commonHeader = commonHeader(Domain.HEARTBEAT)
+            val commonHeader = commonHeader(HEARTBEAT)
             val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
 
             it("should decode only header and pass it on along with raw message") {
index 59224cc..f21a2ec 100644 (file)
@@ -28,9 +28,11 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
 import reactor.test.StepVerifier
@@ -62,14 +64,14 @@ internal object ConsulConfigurationProviderTest : Spek({
                     StepVerifier.create(consulConfigProvider().take(1))
                             .consumeNextWith {
 
-                                assertEquals("kafka:9093", it.kafkaBootstrapServers)
+                                assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
 
                                 val route1 = it.routing.routes[0]
-                                assertEquals(Domain.FAULT, route1.domain)
+                                assertEquals(FAULT.name, route1.domain)
                                 assertEquals("test-topic-1", route1.targetTopic)
 
                                 val route2 = it.routing.routes[1]
-                                assertEquals(Domain.HEARTBEAT, route2.domain)
+                                assertEquals(HEARTBEAT.name, route2.domain)
                                 assertEquals("test-topic-2", route2.targetTopic)
 
                             }.verifyComplete()
@@ -95,7 +97,7 @@ internal object ConsulConfigurationProviderTest : Spek({
                             .verifyErrorMessage("Test exception")
                 }
 
-                it("should update the health state"){
+                it("should update the health state") {
                     StepVerifier.create(healthStateProvider().take(iterationCount))
                             .expectNextCount(iterationCount - 1)
                             .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
@@ -128,21 +130,23 @@ private fun constructConsulConfigProvider(url: String,
 }
 
 
+const val kafkaAddress = "message-router-kafka"
+
 fun constructConsulResponse(): String {
 
     val config = """{
-    "dmaap.kafkaBootstrapServers": "kafka:9093",
+    "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
     "collector.routing": [
             {
-                "fromDomain": 1,
+                "fromDomain": "FAULT",
                 "toTopic": "test-topic-1"
             },
             {
-                "fromDomain": 2,
+                "fromDomain": "HEARTBEAT",
                 "toTopic": "test-topic-2"
             }
     ]
-}"""
+    }"""
 
     val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
 
index ba29844..8e6103c 100644 (file)
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
 import reactor.core.publisher.Flux
 import reactor.math.sum
 import java.security.MessageDigest
@@ -62,7 +62,7 @@ object PerformanceSpecification : Spek({
             val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
 
             val params = MessageParameters(
-                    commonEventHeader = commonHeader(HVRANMEAS),
+                    commonEventHeader = commonHeader(HVMEAS),
                     messageType = VALID,
                     amount = numMessages
             )
@@ -92,7 +92,7 @@ object PerformanceSpecification : Spek({
             val timeout = Duration.ofSeconds(30)
 
             val params = MessageParameters(
-                    commonEventHeader = commonHeader(HVRANMEAS),
+                    commonEventHeader = commonHeader(HVMEAS),
                     messageType = VALID,
                     amount = numMessages
             )
index a9f3e9a..60e10ee 100644 (file)
@@ -24,9 +24,13 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -39,7 +43,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
 import reactor.core.publisher.Flux
 import java.time.Duration
 
@@ -54,8 +58,8 @@ object VesHvSpecification : Spek({
         it("should handle multiple HV RAN events") {
             val (sut, sink) = vesHvWithStoringSink()
             val messages = sut.handleConnection(sink,
-                    vesWireFrameMessage(Domain.HVRANMEAS),
-                    vesWireFrameMessage(Domain.HVRANMEAS)
+                    vesWireFrameMessage(HVMEAS),
+                    vesWireFrameMessage(HVMEAS)
             )
 
             assertThat(messages)
@@ -65,8 +69,8 @@ object VesHvSpecification : Spek({
 
         it("should not handle messages received from client after end-of-transmission message") {
             val (sut, sink) = vesHvWithStoringSink()
-            val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
-            val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+            val validMessage = vesWireFrameMessage(HVMEAS)
+            val anotherValidMessage = vesWireFrameMessage(HVMEAS)
             val endOfTransmissionMessage = endOfTransmissionWireMessage()
 
             val handledEvents = sut.handleConnection(sink,
@@ -91,23 +95,19 @@ object VesHvSpecification : Spek({
     describe("Memory management") {
         it("should release memory for each handled and dropped message") {
             val (sut, sink) = vesHvWithStoringSink()
-            val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
-            val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER)
+            val validMessage = vesWireFrameMessage(HVMEAS)
             val msgWithInvalidFrame = invalidWireFrame()
-            val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
+            val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
             val expectedRefCnt = 0
 
             val handledEvents = sut.handleConnection(
-                    sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
+                    sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
 
             assertThat(handledEvents).hasSize(1)
 
             assertThat(validMessage.refCnt())
                     .describedAs("handled message should be released")
                     .isEqualTo(expectedRefCnt)
-            assertThat(msgWithInvalidDomain.refCnt())
-                    .describedAs("message with invalid domain should be released")
-                    .isEqualTo(expectedRefCnt)
             assertThat(msgWithInvalidFrame.refCnt())
                     .describedAs("message with invalid frame should be released")
                     .isEqualTo(expectedRefCnt)
@@ -118,7 +118,7 @@ object VesHvSpecification : Spek({
 
         it("should release memory for end-of-transmission message") {
             val (sut, sink) = vesHvWithStoringSink()
-            val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+            val validMessage = vesWireFrameMessage(HVMEAS)
             val endOfTransmissionMessage = endOfTransmissionWireMessage()
             val expectedRefCnt = 0
 
@@ -138,7 +138,7 @@ object VesHvSpecification : Spek({
 
         it("should release memory for each message with invalid payload") {
             val (sut, sink) = vesHvWithStoringSink()
-            val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+            val validMessage = vesWireFrameMessage(HVMEAS)
             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
             val expectedRefCnt = 0
 
@@ -157,7 +157,7 @@ object VesHvSpecification : Spek({
 
         it("should release memory for each message with garbage frame") {
             val (sut, sink) = vesHvWithStoringSink()
-            val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+            val validMessage = vesWireFrameMessage(HVMEAS)
             val msgWithGarbageFrame = garbageFrame()
             val expectedRefCnt = 0
 
@@ -179,11 +179,11 @@ object VesHvSpecification : Spek({
         it("should direct message to a topic by means of routing configuration") {
             val (sut, sink) = vesHvWithStoringSink()
 
-            val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+            val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
         }
 
@@ -193,17 +193,17 @@ object VesHvSpecification : Spek({
             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
 
             val messages = sut.handleConnection(sink,
-                    vesWireFrameMessage(Domain.HVRANMEAS),
-                    vesWireFrameMessage(Domain.HEARTBEAT),
-                    vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+                    vesWireFrameMessage(HVMEAS),
+                    vesWireFrameMessage(HEARTBEAT),
+                    vesWireFrameMessage(MEASUREMENT))
 
             assertThat(messages).describedAs("number of routed messages").hasSize(3)
 
             assertThat(messages[0].topic).describedAs("first message topic")
-                    .isEqualTo(HVRANMEAS_TOPIC)
+                    .isEqualTo(HVMEAS_TOPIC)
 
             assertThat(messages[1].topic).describedAs("second message topic")
-                    .isEqualTo(HVRANMEAS_TOPIC)
+                    .isEqualTo(HVMEAS_TOPIC)
 
             assertThat(messages[2].topic).describedAs("last message topic")
                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
@@ -212,14 +212,14 @@ object VesHvSpecification : Spek({
         it("should drop message if route was not found") {
             val (sut, sink) = vesHvWithStoringSink()
             val messages = sut.handleConnection(sink,
-                    vesWireFrameMessage(Domain.OTHER, "first"),
-                    vesWireFrameMessage(Domain.HVRANMEAS, "second"),
-                    vesWireFrameMessage(Domain.HEARTBEAT, "third"))
+                    vesWireFrameMessage(OTHER, "first"),
+                    vesWireFrameMessage(HVMEAS, "second"),
+                    vesWireFrameMessage(HEARTBEAT, "third"))
 
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
         }
     }
@@ -253,41 +253,41 @@ object VesHvSpecification : Spek({
 
                 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
 
-                val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
                 assertThat(messages).isEmpty()
 
                 sut.configurationProvider.updateConfiguration(basicConfiguration)
 
-                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
                 assertThat(messagesAfterUpdate).hasSize(1)
                 val message = messagesAfterUpdate[0]
 
                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
-                        .isEqualTo(HVRANMEAS_TOPIC)
+                        .isEqualTo(HVMEAS_TOPIC)
                 assertThat(message.partition).describedAs("routed message partition")
                         .isEqualTo(0)
             }
 
             it("should change domain routing") {
 
-                val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
                 assertThat(messages).hasSize(1)
                 val firstMessage = messages[0]
 
                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
-                        .isEqualTo(HVRANMEAS_TOPIC)
+                        .isEqualTo(HVMEAS_TOPIC)
                 assertThat(firstMessage.partition).describedAs("routed message partition")
                         .isEqualTo(0)
 
 
                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
-                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
                 assertThat(messagesAfterUpdate).hasSize(2)
                 val secondMessage = messagesAfterUpdate[1]
 
                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
-                        .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+                        .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
                 assertThat(secondMessage.partition).describedAs("routed message partition")
                         .isEqualTo(0)
             }
@@ -302,13 +302,13 @@ object VesHvSpecification : Spek({
                         sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
                     }
                 }.doOnNext {
-                    sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                    sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
                 }.then().block(defaultTimeout)
 
 
                 val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+                val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
 
                 assertThat(messages.size).isEqualTo(messagesAmount)
                 assertThat(messagesForEachTopic)
@@ -329,14 +329,14 @@ object VesHvSpecification : Spek({
                                 println("config changed")
                             }
                         }
-                        .map { vesWireFrameMessage(Domain.HVRANMEAS) }
+                        .map { vesWireFrameMessage(HVMEAS) }
 
 
                 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
 
                 val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+                val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
 
                 assertThat(messages.size).isEqualTo(messageStreamSize)
                 assertThat(firstTopicMessagesCount)
@@ -373,9 +373,9 @@ object VesHvSpecification : Spek({
             val (sut, sink) = vesHvWithStoringSink()
 
             val handledMessages = sut.handleConnection(sink,
-                    vesWireFrameMessage(Domain.HVRANMEAS, "first"),
-                    vesMessageWithTooBigPayload(Domain.HVRANMEAS),
-                    vesWireFrameMessage(Domain.HVRANMEAS))
+                    vesWireFrameMessage(HVMEAS, "first"),
+                    vesMessageWithTooBigPayload(HVMEAS),
+                    vesWireFrameMessage(HVMEAS))
 
             assertThat(handledMessages).hasSize(1)
             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
index ebeaa69..688f275 100644 (file)
 package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
 import reactor.core.publisher.FluxProcessor
 import reactor.core.publisher.UnicastProcessor
 import reactor.retry.RetryExhaustedException
 
 
-const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val HVMEAS_TOPIC = "ves_hvRanMeas"
 const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
-const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas"
+const val ALTERNATE_HVMEAS_TOPIC = "ves_alternateHvRanMeas"
 
 val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
         kafkaBootstrapServers = "localhost:9969",
         routing = routing {
             defineRoute {
-                fromDomain(Domain.HVRANMEAS)
-                toTopic(HVRANMEAS_TOPIC)
+                fromDomain(HVMEAS.name)
+                toTopic(HVMEAS_TOPIC)
                 withFixedPartitioning()
             }
         }.build()
@@ -47,17 +50,17 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
         kafkaBootstrapServers = "localhost:9969",
         routing = routing {
             defineRoute {
-                fromDomain(Domain.HVRANMEAS)
-                toTopic(HVRANMEAS_TOPIC)
+                fromDomain(HVMEAS.name)
+                toTopic(HVMEAS_TOPIC)
                 withFixedPartitioning()
             }
             defineRoute {
-                fromDomain(Domain.HEARTBEAT)
-                toTopic(HVRANMEAS_TOPIC)
+                fromDomain(HEARTBEAT.name)
+                toTopic(HVMEAS_TOPIC)
                 withFixedPartitioning()
             }
             defineRoute {
-                fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+                fromDomain(MEASUREMENT.name)
                 toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
                 withFixedPartitioning()
             }
@@ -69,8 +72,8 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu
         kafkaBootstrapServers = "localhost:9969",
         routing = routing {
             defineRoute {
-                fromDomain(Domain.HVRANMEAS)
-                toTopic(ALTERNATE_HVRANMEAS_TOPIC)
+                fromDomain(HVMEAS.name)
+                toTopic(ALTERNATE_HVMEAS_TOPIC)
                 withFixedPartitioning()
             }
         }.build()
index 354edae..51f94cc 100644 (file)
@@ -30,7 +30,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass
 import java.io.InputStream
 import javax.json.Json
 
@@ -68,22 +68,22 @@ class MessageStreamValidation(
             parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
 
 
-    private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean {
+    private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, expected: List<VesEventOuterClass.VesEvent>): Boolean {
         val consumedHeaders = actual.map { it.commonEventHeader }
         val generatedHeaders = expected.map { it.commonEventHeader }
         return generatedHeaders == consumedHeaders
     }
 
 
-    private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> =
+    private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
             messageGenerator.createMessageFlux(parameters)
                     .map(PayloadWireFrameMessage::payload)
                     .map(ByteData::unsafeAsArray)
-                    .map(VesEventV5.VesEvent::parseFrom)
+                    .map(VesEventOuterClass.VesEvent::parseFrom)
                     .collectList()
                     .asIo()
 
     private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
-            consumedMessages.map(VesEventV5.VesEvent::parseFrom)
+            consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
 
 }
index c0ba581..017360b 100644 (file)
@@ -36,8 +36,8 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.mockito.ArgumentMatchers.anySet
 import org.mockito.Mockito
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import java.util.concurrent.ConcurrentLinkedQueue
 
 /**
@@ -179,6 +179,6 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
     return VesEvent.newBuilder()
             .setCommonEventHeader(CommonEventHeader.newBuilder()
                     .setEventId(eventId))
-            .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+            .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
             .build()
 }
index 2932367..beef26b 100644 (file)
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
 import arrow.core.Either
-import arrow.core.Left
-import arrow.core.None
 import arrow.core.Right
-import arrow.core.Some
-import arrow.effects.IO
-import javax.json.stream.JsonParsingException
 import com.google.protobuf.ByteString
 import com.nhaarman.mockito_kotlin.any
 import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.never
-import com.nhaarman.mockito_kotlin.verify
 import com.nhaarman.mockito_kotlin.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.fail
@@ -38,19 +31,15 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.mockito.ArgumentMatchers.anyList
-import org.mockito.ArgumentMatchers.anySet
 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
-import java.util.concurrent.ConcurrentLinkedQueue
-import javax.json.Json
-import javax.json.JsonArray
-import javax.json.JsonValue
+import javax.json.stream.JsonParsingException
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -216,7 +205,7 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
     return VesEvent.newBuilder()
             .setCommonEventHeader(CommonEventHeader.newBuilder()
                     .setEventId(eventId))
-            .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+            .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
             .build()
 }
 
index eb2a758..8f6ec87 100644 (file)
@@ -74,7 +74,7 @@
                         <configuration>
                             <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
                             <inputDirectories>
-                                <include>${project.basedir}/src/main/proto</include>
+                                <include>${project.basedir}/src/main/proto/event</include>
                             </inputDirectories>
                             <outputTargets>
                                 <outputTarget>
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt
new file mode 100644 (file)
index 0000000..3e99cdc
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+enum class VesEventDomain {
+    FAULT,
+    HEARTBEAT,
+    MEASUREMENT,
+    MOBILE_FLOW,
+    OTHER,
+    PNFREGISTRATION,
+    SIP_SIGNALING,
+    STATE_CHANGE,
+    SYSLOG,
+    THRESHOLD_CROSSING_ALERT,
+    VOICE_QUALITY,
+    HVMEAS
+}
index 91c7545..339a652 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.domain
 
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass
 
 val headerRequiredFieldDescriptors = listOf(
         "version",
-        "eventName",
         "domain",
-        "eventId",
-        "sourceName",
-        "reportingEntityName",
+        "sequence",
         "priority",
-        "startEpochMicrosec",
+        "eventId",
+        "eventName",
         "lastEpochMicrosec",
-        "sequence")
-        .map { fieldName -> VesEventV5.VesEvent.CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
\ No newline at end of file
+        "startEpochMicrosec",
+        "reportingEntityName",
+        "sourceName",
+        "vesEventListenerVersion")
+        .map { fieldName -> VesEventOuterClass.CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
diff --git a/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto b/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto
deleted file mode 100644 (file)
index 5121f0e..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-syntax = "proto3";
-package org.onap.ves;
-
-// Definition for RTPM
-
-message HVRanMeasFields {
-  message HVRanMeasPayload {
-    message PMObject {
-      message HVRanMeas {
-        uint32 measurement_id = 1;
-        repeated uint32 counter_subid = 2;
-        repeated sint64 counter_value = 3;
-        repeated uint32 missing_counter_subid = 4;
-        bool suspectFlagIncomplete = 5; // (some is data missing due to internal error)
-        bool suspectFlagOutOfSync = 6; // (source time not aligned)
-      }
-
-      string uri = 1; // monitored object URI
-      repeated HVRanMeas hvRanMeas = 2; // performance counters grouped by measurement types
-    }
-    repeated PMObject pmObject = 1;
-  }
-
-  message AdditionalField {
-    string name = 1;
-    string value = 2;
-  }
-
-  string hvRanMeasFieldsVersion = 1; // version of HVRanMeasFields message
-  uint32 period_ms = 2; // period configured for reporting the data in milliseconds
-  string timezone = 3; // timezone of Network Function sending the data
-  string pmDictionaryVsn = 4; // vendor name + schema version E.g. NOKIA_LN7.0, uniquely identify the relevant PM dictionary
-  HVRanMeasPayload hvRanMeasPayload = 5; // objects being monitored
-  repeated AdditionalField additionalFields = 6; // array of name-value pairs if needed
-}
\ No newline at end of file
diff --git a/hv-collector-domain/src/main/proto/VesEvent-v5.proto b/hv-collector-domain/src/main/proto/VesEvent-v5.proto
deleted file mode 100644 (file)
index 340133b..0000000
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-syntax = "proto3";
-package org.onap.ves;
-
-message VesEvent {
-
-  // VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
-  // Source: https://git.opnfv.org/ves/tree/tests/docs/ves_data_model.json
-  //         2017-05-13 Align with VES 5.0 schema.
-  //         blob: ca948ff67e8a2de4e2a47cffc4d4d2893170ab76
-
-  message CommonEventHeader {
-    string version = 1; // required, "version of the event header"
-    enum Domain {
-      DOMAIN_UNDEFINED = 0;
-      FAULT = 1;
-      HEARTBEAT = 2;
-      MEASUREMENTS_FOR_VF_SCALING = 3;
-      MOBILE_FLOW = 4;
-      SIP_SIGNALING = 5;
-      STATE_CHANGE = 6;
-      SYSLOG = 7;
-      THRESHOLD_CROSSING_ALERT = 8;
-      VOICE_QUALITY = 9;
-      OTHER = 10;
-      HVRANMEAS = 11;
-    }
-    Domain domain = 2; // required, "the eventing domain associated with the event" [map to string]
-
-    uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
-
-    enum Priority {
-      PRIORITY_UNDEFINED = 0;
-      HIGH = 1;
-      MEDIUM = 2;
-      NORMAL = 3;
-      LOW = 4;
-    }
-    Priority priority = 4; // required, "processing priority"
-
-    string eventId = 5; // required, "event key that is unique to the event source"
-    string eventName = 6; // required, "unique event name"
-    string eventType = 7; // "for example - applicationVnf, guestOS, hostOS, platform"
-
-    uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
-    uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
-
-    string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
-    string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
-
-    string reportingEntityId = 12; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
-    bytes reportingEntityName = 13; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName"
-    bytes sourceId = 14; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
-    string sourceName = 15; // required, "name of the entity experiencing the event issue"
-
-    reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
-    reserved 100;
-  }
-
-  CommonEventHeader commonEventHeader = 1;
-
-  oneof eventFields // required, payload, each high-volume domain has its specific GPB schema
-  {
-    bytes hvRanMeasFields = 2; // if domain==HVRANMEAS, GPB schema: HVRanMeasFields.proto
-  }
-}
-
-message VesEventList {
-  repeated VesEvent vesEvent = 1;
-}
diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto
new file mode 100644 (file)
index 0000000..54a6d14
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+
+message VesEvent {
+    CommonEventHeader commonEventHeader = 1; // required
+
+    oneof eventFields // required, payload
+    {
+        // each new high-volume domain can add an entry for its own GPB message
+        // the field can be opaque (bytes) to allow decoding the payload in a separate step
+        bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields
+    }
+}
+
+// VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
+// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain
+
+message CommonEventHeader {
+    string version = 1;               // required, "version of the gpb common event header"
+    string domain = 2;                // required, "the eventing domain associated with the event", allowed values:
+                                      // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
+                                      // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
+
+    uint32 sequence = 3;              // required, "ordering of events communicated by an event source instance or 0 if not needed"
+
+    enum Priority {
+        PRIORITY_NOT_PROVIDED = 0;
+        HIGH = 1;
+        MEDIUM = 2;
+        NORMAL = 3;
+        LOW = 4;
+    }
+    Priority priority = 4;            // required, "processing priority"
+
+    string eventId = 5;               // required, "event key that is unique to the event source"
+    string eventName = 6;             // required, "unique event name"
+    string eventType = 7;             // "for example - guest05,  platform"
+
+    uint64 lastEpochMicrosec = 8;     // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+    uint64 startEpochMicrosec = 9;    // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+
+    string nfNamingCode = 10;         // "4 character network function type, aligned with vnf naming standards"
+    string nfcNamingCode = 11;        // "3 character network function component type, aligned with vfc naming standards"
+    string nfVendorName = 12;         // " Vendor Name providing the nf "
+
+    bytes reportingEntityId = 13;     // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
+    string reportingEntityName = 14;  // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
+    bytes sourceId = 15;              // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
+    string sourceName = 16;           // required, "name of the entity experiencing the event issued use A&AI entry"
+    string timeZoneOffset = 17;       // "Offset to GMT to indicate local time zone for the device"
+    string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
+
+    reserved "InternalHeaderFields";  // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
+    reserved 100;
+}
diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
new file mode 100644 (file)
index 0000000..9a8582d
--- /dev/null
@@ -0,0 +1,43 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * dcaegen2-collectors-veshv\r
+ * ================================================================================\r
+ * Copyright (C) 2018 NOKIA\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+syntax = "proto3";\r
+package org.onap.ves;\r
+import "MeasDataCollection.proto";          // for 3GPP PM format\r
+\r
+message HVMeasFields\r
+{\r
+    string hvMeasFieldsVersion = 1;\r
+    measDataCollection.MeasDataCollection measDataCollection = 2;\r
+    // From 3GPP TS 28.550\r
+    // Informative: mapping between similar header fields (format may be different)\r
+    // 3GPP MeasStreamHeader   ONAP/VES CommonEventHeader\r
+    // senderName              sourceName\r
+    // senderType              nfNamingCode + nfcNamingCode\r
+    // vendorName              nfVendorName\r
+    // collectionBeginTime     startEpochMicrosec\r
+    // timestamp               lastEpochMicrosec\r
+    repeated HashMap eventAddlFlds = 3;     // optional per-event data\r
+}\r
+\r
+message HashMap\r
+{\r
+    string name = 1;\r
+    string value = 2;\r
+}
\ No newline at end of file
diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
new file mode 100644 (file)
index 0000000..472dcc4
--- /dev/null
@@ -0,0 +1,104 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * dcaegen2-collectors-veshv\r
+ * ================================================================================\r
+ * Copyright (C) 2018 NOKIA\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+syntax = "proto3";\r
+package measDataCollection;\r
+\r
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).\r
+// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.\r
+// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.\r
+// Differences/additions to 3GPP TS 28.550 are marked with "%%".\r
+\r
+message MeasDataCollection                  // top-level message \r
+{\r
+    MeasHeader measHeader = 1;\r
+    repeated MeasData measData = 2;         // %%: use a single instance for RTPM\r
+    MeasFooter measFooter = 3;\r
+}\r
+\r
+message MeasHeader\r
+{\r
+    string streamFormatVersion = 1;\r
+    string senderName = 2;\r
+    string senderType = 3;\r
+    string vendorName = 4;\r
+    string collectionBeginTime = 5;         // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)\r
+}\r
+\r
+message MeasData\r
+{\r
+    string measuredEntityId = 1;            // DN as per 3GPP TS 32.300\r
+    string measuredEntityUserName = 2;      // network function User Name\r
+    string measuredEntitySoftwareVersion = 3;\r
+    uint32 granularityPeriod = 4;           // in seconds, %% moved from MeasInfo (single reporting period per event)\r
+    repeated string measObjInstIdList = 5;  // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432\r
+    repeated MeasInfo measInfo = 6; \r
+}\r
+\r
+\r
+message MeasInfo\r
+{\r
+    oneof MeasInfoId {                      // measurement group identifier\r
+        uint32 iMeasInfoId = 1;             // identifier as integer (%%: more compact)\r
+        string measInfoId = 2;              // identifier as string (more generic)\r
+    }\r
+\r
+    oneof MeasTypes {                       // measurement identifiers associated with the measurement results\r
+        IMeasTypes iMeasTypes = 3;          // identifiers as integers (%%: more compact)\r
+        SMeasTypes measTypes = 4;           // identifiers as strings (more generic)\r
+    }\r
+    // Needed only because GPB does not support repeated fields directly inside 'oneof'\r
+    message IMeasTypes { repeated uint32 iMeasType = 1; }\r
+    message SMeasTypes { repeated string measType = 1; }\r
+\r
+    string jobIdList = 5;\r
+    repeated MeasValue measValues = 6;      // performance measurements grouped by measurement groups\r
+}\r
+\r
+message MeasValue\r
+{\r
+    oneof MeasObjInstId {                   // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432\r
+        string measObjInstId = 1;           // LDN itself\r
+        uint32 measObjInstIdListIdx = 2;    // %%: index into measObjInstIdList\r
+    }\r
+    repeated MeasResult measResults = 3;\r
+    bool suspectFlag = 4;\r
+    repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data\r
+}\r
+\r
+message MeasResult\r
+{\r
+    uint32 p = 1;                           // Optional index in the MeasTypes array\r
+    oneof xValue {\r
+        sint64 iValue = 2;\r
+        double rValue = 3;\r
+        bool isNull = 4;\r
+    }\r
+}\r
+\r
+message MeasFooter\r
+{\r
+    string timestamp = 1;                   // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"\r
+}\r
+\r
+message nameValue                           // %%: vendor-defined name-value pair\r
+{\r
+    string name = 1;\r
+    string value = 2;\r
+} 
\ No newline at end of file
index fa63c36..b992d53 100644 (file)
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.domain
 import arrow.core.Either
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.ObjectAssert
 import org.jetbrains.spek.api.Spek
index c6aa89b..7804226 100644 (file)
@@ -25,7 +25,10 @@ import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.PooledByteBufAllocator
 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+
 import java.util.UUID.randomUUID
 
 
@@ -39,7 +42,7 @@ private fun ByteBuf.writeValidWireFrameHeaders() {
     writeByte(0x01)          // content type = GPB
 }
 
-fun vesWireFrameMessage(domain: Domain = Domain.OTHER,
+fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
                         id: String = randomUUID().toString()): ByteBuf =
         allocator.buffer().run {
             writeValidWireFrameHeaders()
@@ -70,7 +73,7 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
     writeByte(0x01)   // content type = GPB
 }
 
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.DOMAIN_UNDEFINED): ByteBuf =
+fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf =
         allocator.buffer().run {
             writeValidWireFrameHeaders()
 
index 6aeb620..0341c2f 100644 (file)
@@ -23,27 +23,31 @@ package org.onap.dcae.collectors.veshv.tests.utils
 import com.google.protobuf.ByteString
 import com.google.protobuf.MessageLite
 import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.ves.VesEventOuterClass
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
 import java.util.UUID.randomUUID
 
-fun vesEvent(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS,
+fun vesEvent(domain: VesEventDomain = HVMEAS,
              id: String = randomUUID().toString(),
              hvRanMeasFields: ByteString = ByteString.EMPTY
-): VesEventV5.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields)
+): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields)
 
-fun vesEvent(commonEventHeader: VesEventV5.VesEvent.CommonEventHeader,
-             hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventV5.VesEvent =
-        VesEventV5.VesEvent.newBuilder()
+fun vesEvent(commonEventHeader: CommonEventHeader,
+             hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
+        VesEventOuterClass.VesEvent.newBuilder()
                 .setCommonEventHeader(commonEventHeader)
-                .setHvRanMeasFields(hvRanMeasFields)
+                .setHvMeasFields(hvRanMeasFields)
                 .build()
 
-fun commonHeader(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS,
+fun commonHeader(domain: VesEventDomain = HVMEAS,
                  id: String = randomUUID().toString(),
-                 priority: VesEventV5.VesEvent.CommonEventHeader.Priority = VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL): VesEventV5.VesEvent.CommonEventHeader =
-        VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+                 priority: Priority = Priority.NORMAL): CommonEventHeader =
+        CommonEventHeader.newBuilder()
                 .setVersion("sample-version")
-                .setDomain(domain)
+                .setDomain(domain.name)
                 .setSequence(1)
                 .setPriority(priority)
                 .setEventId(id)
@@ -53,13 +57,16 @@ fun commonHeader(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEvent
                 .setLastEpochMicrosec(120034455)
                 .setNfNamingCode("sample-nf-naming-code")
                 .setNfcNamingCode("sample-nfc-naming-code")
-                .setReportingEntityId("sample-reporting-entity-id")
-                .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+                .setNfVendorName("vendor-name")
+                .setReportingEntityId(ByteString.copyFromUtf8("sample-reporting-entity-id"))
+                .setReportingEntityName("sample-reporting-entity-name")
                 .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
                 .setSourceName("sample-source-name")
+                .setTimeZoneOffset("+1")
+                .setVesEventListenerVersion("another-version")
                 .build()
 
-fun vesEventBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
+fun vesEventBytes(commonHeader: CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
         vesEvent(commonHeader, byteString).toByteData()
 
 fun MessageLite.toByteData(): ByteData = ByteData(toByteArray())
\ No newline at end of file
index 768685c..909db5e 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 import arrow.core.Option
 import com.google.protobuf.util.JsonFormat
 import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
-import org.onap.ves.VesEventV5.VesEvent.*
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import javax.json.JsonObject
 
 /**
index fec2609..5d1f56d 100644 (file)
@@ -31,10 +31,9 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVA
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventOuterClass.VesEvent
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import java.nio.charset.Charset
@@ -79,10 +78,6 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
                     PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
             }
 
-    private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: HVRanMeasPayload): ByteArray {
-        return vesEvent(commonEventHeader, hvRanMeasPayload.toByteString())
-    }
-
     private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray {
         return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray()
     }
@@ -90,7 +85,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
     private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
             VesEvent.newBuilder()
                     .setCommonEventHeader(commonEventHeader)
-                    .setHvRanMeasFields(payload)
+                    .setHvMeasFields(payload)
                     .build()
 
     private fun oversizedPayload() =
index acdaf19..ef7eefa 100644 (file)
 package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 
 import com.google.protobuf.ByteString
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas
 import java.util.*
+import kotlin.streams.asSequence
 
 internal class PayloadGenerator {
 
@@ -32,34 +30,12 @@ internal class PayloadGenerator {
     fun generateRawPayload(size: Int): ByteString =
             ByteString.copyFrom(ByteArray(size))
 
-    fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload {
-        val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject)
-        return HVRanMeasPayload.newBuilder()
-                .addPmObject(pmObject)
-                .build()
-    }
+    fun generatePayload(numOfCountMeasurements: Long = 2): ByteString =
+            ByteString.copyFrom(
+                    randomGenerator.ints(numOfCountMeasurements, 0, 256)
+                            .asSequence()
+                            .toString()
+                            .toByteArray()
+            )
 
-    private fun generatePmObject(numOfCountPerMeas: Long, numOfMeasPerObject: Int): PMObject {
-        val hvRanMeasList = MutableList(numOfMeasPerObject) { generateHvRanMeas(numOfCountPerMeas) }
-        val finalUriName = URI_BASE_NAME + randomGenerator.nextInt(UPPER_URI_NUMBER_BOUND)
-        return HVRanMeasPayload.PMObject.newBuilder()
-                .setUri(finalUriName)
-                .addAllHvRanMeas(hvRanMeasList.asIterable())
-                .build()
-    }
-
-    private fun generateHvRanMeas(numOfCountPerMeas: Long): HVRanMeas {
-        return HVRanMeasPayload.PMObject.HVRanMeas.newBuilder()
-                .setMeasurementId(randomGenerator.nextInt())
-                .addAllCounterSubid(Iterable { randomGenerator.ints(numOfCountPerMeas).iterator() })
-                .addAllCounterValue(Iterable { randomGenerator.longs(numOfCountPerMeas).iterator() })
-                .setSuspectFlagIncomplete(false)
-                .setSuspectFlagOutOfSync(false)
-                .build()
-    }
-
-    companion object {
-        private const val URI_BASE_NAME = "sample/uri"
-        private const val UPPER_URI_NUMBER_BOUND = 10_000
-    }
 }
index c16459c..ce394cc 100644 (file)
@@ -27,8 +27,9 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import java.io.ByteArrayInputStream
 import javax.json.Json
 import kotlin.test.fail
@@ -40,7 +41,7 @@ class CommonEventHeaderParserTest : Spek({
 
         given("valid header in JSON format") {
             val commonEventHeader = commonHeader(
-                    domain = VesEventV5.VesEvent.CommonEventHeader.Domain.STATE_CHANGE,
+                    domain = STATE_CHANGE,
                     id = "sample-event-id")
             val json = JsonFormat.printer().print(commonEventHeader).byteInputStream()
 
@@ -75,7 +76,7 @@ class CommonEventHeaderParserTest : Spek({
     }
 })
 
-fun assertFailed(result: Option<VesEventV5.VesEvent.CommonEventHeader>) =
+fun assertFailed(result: Option<CommonEventHeader>) =
         result.fold({}, { fail() })
 
 fun jsonObject(json: ByteArrayInputStream) = Json.createReader(json).readObject()
\ No newline at end of file
index f13a33b..ea3d094 100644 (file)
@@ -30,15 +30,15 @@ import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.test.test
 
 /**
@@ -54,7 +54,7 @@ object MessageGeneratorImplTest : Spek({
                     val limit = 1000L
                     generator
                             .createMessageFlux(listOf(MessageParameters(
-                                    commonHeader(HVRANMEAS),
+                                    commonHeader(HVMEAS),
                                     MessageType.VALID
                             )))
                             .take(limit)
@@ -67,7 +67,7 @@ object MessageGeneratorImplTest : Spek({
                 it("should create message flux of specified size") {
                     generator
                             .createMessageFlux(listOf(MessageParameters(
-                                    commonHeader(HVRANMEAS),
+                                    commonHeader(HVMEAS),
                                     MessageType.VALID,
                                     5
                             )))
@@ -88,7 +88,7 @@ object MessageGeneratorImplTest : Spek({
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
-                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
                             }
                             .verifyComplete()
                 }
@@ -98,7 +98,7 @@ object MessageGeneratorImplTest : Spek({
 
                     generator
                             .createMessageFlux(listOf(MessageParameters(
-                                    commonHeader(HVRANMEAS),
+                                    commonHeader(HVMEAS),
                                     MessageType.TOO_BIG_PAYLOAD,
                                     1
                             )))
@@ -106,7 +106,7 @@ object MessageGeneratorImplTest : Spek({
                             .assertNext {
                                 assertThat(it.isValid()).isTrue()
                                 assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
-                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
                             }
                             .verifyComplete()
                 }
@@ -115,7 +115,7 @@ object MessageGeneratorImplTest : Spek({
                 it("should create flux of messages with invalid payload") {
                     generator
                             .createMessageFlux(listOf(MessageParameters(
-                                    commonHeader(HVRANMEAS),
+                                    commonHeader(HVMEAS),
                                     MessageType.INVALID_GPB_DATA,
                                     1
                             )))
@@ -133,7 +133,7 @@ object MessageGeneratorImplTest : Spek({
                 it("should create flux of messages with invalid version") {
                     generator
                             .createMessageFlux(listOf(MessageParameters(
-                                    commonHeader(HVRANMEAS),
+                                    commonHeader(HVMEAS),
                                     MessageType.INVALID_WIRE_FRAME,
                                     1
                             )))
@@ -141,7 +141,7 @@ object MessageGeneratorImplTest : Spek({
                             .assertNext {
                                 assertThat(it.isValid()).isFalse()
                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
-                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
                                 assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR)
                             }
                             .verifyComplete()
@@ -160,7 +160,7 @@ object MessageGeneratorImplTest : Spek({
                                 assertThat(it.isValid()).isTrue()
                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
                                 assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
-                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+                                assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
                             }
                             .verifyComplete()
                 }
@@ -170,7 +170,7 @@ object MessageGeneratorImplTest : Spek({
             it("should create concatenated flux of messages") {
                 val singleFluxSize = 5L
                 val messageParameters = listOf(
-                        MessageParameters(commonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize),
+                        MessageParameters(commonHeader(HVMEAS), MessageType.VALID, singleFluxSize),
                         MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
                         MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
                 )
@@ -178,17 +178,17 @@ object MessageGeneratorImplTest : Spek({
                         .test()
                         .assertNext {
                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
-                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
                         }
                         .expectNextCount(singleFluxSize - 1)
                         .assertNext {
                             assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
-                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
                         }
                         .expectNextCount(singleFluxSize - 1)
                         .assertNext {
                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
-                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
+                            assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name)
                         }
                         .expectNextCount(singleFluxSize - 1)
                         .verifyComplete()
@@ -197,10 +197,10 @@ object MessageGeneratorImplTest : Spek({
     }
 })
 
-fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
-    return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-}
+fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
+        VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+
+
+fun extractHvRanMeasFields(bytes: ByteData): ByteString =
+        VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields
 
-fun extractHvRanMeasFields(bytes: ByteData): ByteString {
-    return VesEvent.parseFrom(bytes.unsafeAsArray()).hvRanMeasFields
-}
index 3695ca4..2b41e29 100644 (file)
@@ -26,50 +26,27 @@ import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
 
-private const val DEFAULT_MEASUREMENTS_NUMBER = 2
-private const val DEFAULT_COUNTERS_NUMBER = 2
-
-private val uriRegex = """sample/uri(\d+)""".toRegex()
-
 object PayloadGeneratorTest : Spek({
 
     given("payload factory object") {
         val payloadGenerator = PayloadGenerator()
 
-        on("two generated payloads") {
-            val generatedPayload0 = payloadGenerator.generatePayload()
-            val generatedPayload1 = payloadGenerator.generatePayload()
-            it("URIs should have different names") {
-                val matchResult0 = uriRegex.find(generatedPayload0.getPmObject(0).uri)!!.value
-                val matchResult1 = uriRegex.find(generatedPayload1.getPmObject(0).uri)!!.value
-                assertThat(matchResult0 != matchResult1).isTrue()
-            }
-        }
+        on("raw payload generation") {
+            val size = 100
+            val generatedPayload = payloadGenerator.generateRawPayload(size)
 
-        on("call with default parameters") {
-            val generatedPayload = payloadGenerator.generatePayload()
-            it("should contain default numbers of measurements") {
-                assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(DEFAULT_MEASUREMENTS_NUMBER)
-            }
-            it("should contain default numbers of counters in measurement") {
-                assertThat(generatedPayload.getPmObject(0).getHvRanMeas(0).counterSubidCount).isEqualTo(DEFAULT_COUNTERS_NUMBER)
+            it("should generate sequence of zeros") {
+                assertThat(generatedPayload.size()).isEqualTo(size)
+                assertThat(generatedPayload.toByteArray()).isEqualTo(ByteArray(size))
             }
         }
 
-        on("call with specified parameters") {
-            val numOfCountPerMeas: Long = 5
-            val numOfMeasPerObject = 10
-            val generatedPayload = payloadGenerator.generatePayload(numOfCountPerMeas, numOfMeasPerObject)
-            it("should contain specified number of measurements") {
-                assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(numOfMeasPerObject)
-            }
-            it("measurement should contain specified number of counters") {
-                assertThat(generatedPayload.getPmObject(0).hvRanMeasList
-                        .filter { numOfCountPerMeas.toInt() == it.counterSubidCount }
-                        .size)
-                        .isEqualTo(numOfMeasPerObject)
+        on("two generated payloads") {
+            val generatedPayload0 = payloadGenerator.generatePayload()
+            val generatedPayload1 = payloadGenerator.generatePayload()
+            it("should be different") {
+                assertThat(generatedPayload0 != generatedPayload1).isTrue()
             }
-
         }
     }
 })
index 8855060..45e936a 100644 (file)
@@ -21,73 +21,81 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
 
 import javax.json.Json
 
-private const val validMessageParameters = "[\n" +
-        "  {\n" +
-        "    \"commonEventHeader\": {\n" +
-        "      \"version\": \"sample-version\",\n" +
-        "      \"domain\": \"HVRANMEAS\",\n" +
-        "      \"sequence\": 1,\n" +
-        "      \"priority\": 1,\n" +
-        "      \"eventId\": \"sample-event-id\",\n" +
-        "      \"eventName\": \"sample-event-name\",\n" +
-        "      \"eventType\": \"sample-event-type\",\n" +
-        "      \"startEpochMicrosec\": 120034455,\n" +
-        "      \"lastEpochMicrosec\": 120034455,\n" +
-        "      \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
-        "      \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
-        "      \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
-        "      \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
-        "      \"sourceId\": \"sample-source-id\",\n" +
-        "      \"sourceName\": \"sample-source-name\"\n" +
-        "    },\n" +
-        "    \"messageType\": \"VALID\",\n" +
-        "    \"messagesAmount\": 25000\n" +
-        "  },\n" +
-        "  {\n" +
-        "    \"commonEventHeader\": {\n" +
-        "      \"version\": \"sample-version\",\n" +
-        "      \"domain\": \"HVRANMEAS\",\n" +
-        "      \"sequence\": 1,\n" +
-        "      \"priority\": 1,\n" +
-        "      \"eventId\": \"sample-event-id\",\n" +
-        "      \"eventName\": \"sample-event-name\",\n" +
-        "      \"eventType\": \"sample-event-type\",\n" +
-        "      \"startEpochMicrosec\": 120034455,\n" +
-        "      \"lastEpochMicrosec\": 120034455,\n" +
-        "      \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
-        "      \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
-        "      \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
-        "      \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
-        "      \"sourceId\": \"sample-source-id\",\n" +
-        "      \"sourceName\": \"sample-source-name\"\n" +
-        "    },\n" +
-        "    \"messageType\": \"TOO_BIG_PAYLOAD\",\n" +
-        "    \"messagesAmount\": 100\n" +
-        "  }\n" +
-        "]"
+private const val validMessageParameters =
+"""[
+        {
+          "commonEventHeader": {
+            "version": "sample-version",
+            "domain": "HVMEAS",
+            "sequence": 1,
+            "priority": 1,
+            "eventId": "sample-event-id",
+            "eventName": "sample-event-name",
+            "eventType": "sample-event-type",
+            "startEpochMicrosec": 120034455,
+            "lastEpochMicrosec": 120034455,
+            "nfNamingCode": "sample-nf-naming-code",
+            "nfcNamingCode": "sample-nfc-naming-code",
+            "reportingEntityId": "sample-reporting-entity-id",
+            "reportingEntityName": "sample-reporting-entity-name",
+            "sourceId": "sample-source-id",
+            "sourceName": "sample-source-name",
+            "vesEventListenerVersion": "another-version"
+          },
+          "messageType": "VALID",
+          "messagesAmount": 25000
+        },
+        {
+          "commonEventHeader": {
+            "version": "sample-version",
+            "domain": "HVMEAS",
+            "sequence": 1,
+            "priority": 1,
+            "eventId": "sample-event-id",
+            "eventName": "sample-event-name",
+            "eventType": "sample-event-type",
+            "startEpochMicrosec": 120034455,
+            "lastEpochMicrosec": 120034455,
+            "nfNamingCode": "sample-nf-naming-code",
+            "nfcNamingCode": "sample-nfc-naming-code",
+            "reportingEntityId": "sample-reporting-entity-id",
+            "reportingEntityName": "sample-reporting-entity-name",
+            "sourceId": "sample-source-id",
+            "sourceName": "sample-source-name",
+            "vesEventListenerVersion": "another-version"
+          },
+          "messageType": "TOO_BIG_PAYLOAD",
+          "messagesAmount": 100
+        }
+        ]
+"""
 
-private const val invalidMessageParameters = "[\n" +
-        "  {\n" +
-        "    \"commonEventHeader\": {\n" +
-        "      \"version\": \"sample-version\",\n" +
-        "      \"domain\": \"HVRANMEAS\",\n" +
-        "      \"sequence\": 1,\n" +
-        "      \"priority\": 1,\n" +
-        "      \"eventId\": \"sample-event-id\",\n" +
-        "      \"eventName\": \"sample-event-name\",\n" +
-        "      \"eventType\": \"sample-event-type\",\n" +
-        "      \"startEpochMicrosec\": 120034455,\n" +
-        "      \"lastEpochMicrosec\": 120034455,\n" +
-        "      \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
-        "      \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
-        "      \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
-        "      \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
-        "      \"sourceId\": \"sample-source-id\",\n" +
-        "      \"sourceName\": \"sample-source-name\"\n" +
-        "    },\n" +
-        "    \"messagesAmount\": 3\n" +
-        "  }\n" +
-        "]"
+private const val invalidMessageParameters =
+"""
+    [
+        {
+          "commonEventHeader": {
+            "version": "sample-version",
+            "domain": "HVMEAS",
+            "sequence": 1,
+            "priority": 1,
+            "eventId": "sample-event-id",
+            "eventName": "sample-event-name",
+            "eventType": "sample-event-type",
+            "startEpochMicrosec": 120034455,
+            "lastEpochMicrosec": 120034455,
+            "nfNamingCode": "sample-nf-naming-code",
+            "nfcNamingCode": "sample-nfc-naming-code",
+            "reportingEntityId": "sample-reporting-entity-id",
+            "reportingEntityName": "sample-reporting-entity-name",
+            "sourceId": "sample-source-id",
+            "sourceName": "sample-source-name",
+            "vesEventListenerVersion": "another-version"
+          },
+          "messagesAmount": 3
+        }
+        ]
+"""
 
 fun validMessagesParametesJson() = Json
         .createReader(validMessageParameters.reader())