2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
22 import com.google.protobuf.InvalidProtocolBufferException
23 import org.assertj.core.api.Assertions.assertThat
24 import org.assertj.core.api.Assertions.assertThatExceptionOfType
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.describe
27 import org.jetbrains.spek.api.dsl.given
28 import org.jetbrains.spek.api.dsl.it
29 import org.jetbrains.spek.api.dsl.on
30 import org.onap.dcae.collectors.veshv.domain.ByteData
31 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
32 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
33 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
34 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
35 import org.onap.ves.VesEventV5.VesEvent
36 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
37 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.*
38 import reactor.test.test
41 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
44 object MessageGeneratorImplTest : Spek({
45 describe("message factory") {
46 val generator = MessageGenerator.INSTANCE
47 given("single message parameters") {
48 on("messages amount not specified in parameters") {
49 it("should create infinite flux") {
52 .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID)))
55 .expectNextCount(limit)
59 on("messages amount specified in parameters") {
60 it("should create message flux of specified size") {
62 .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5)))
68 on("message type requesting valid message") {
69 it("should create flux of valid messages with given domain") {
71 .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1)))
74 assertThat(it.isValid()).isTrue()
75 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
76 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
81 on("message type requesting too big payload") {
82 it("should create flux of messages with given domain and payload exceeding threshold") {
85 .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1)))
88 assertThat(it.isValid()).isTrue()
89 assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
90 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
95 on("message type requesting unsupported domain") {
96 it("should create flux of messages with domain other than HVRANMEAS") {
99 .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1)))
102 assertThat(it.isValid()).isTrue()
103 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
104 assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS)
109 on("message type requesting invalid GPB data ") {
110 it("should create flux of messages with invalid payload") {
112 .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1)))
115 assertThat(it.isValid()).isTrue()
116 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
117 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
118 .isThrownBy { extractCommonEventHeader(it.payload) }
123 on("message type requesting invalid wire frame ") {
124 it("should create flux of messages with invalid version") {
126 .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1)))
129 assertThat(it.isValid()).isFalse()
130 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
131 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
132 assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION)
138 given("list of message parameters") {
139 it("should create concatenated flux of messages") {
140 val singleFluxSize = 5L
141 val messageParameters = listOf(
142 MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize),
143 MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
144 MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize)
146 generator.createMessageFlux(messageParameters)
149 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
150 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
152 .expectNextCount(singleFluxSize - 1)
154 assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
155 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
157 .expectNextCount(singleFluxSize - 1)
159 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
160 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
162 .expectNextCount(singleFluxSize - 1)
169 fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
170 return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader