Merge "Update README.md"
[dcaegen2/collectors/hv-ves.git] / hv-collector-ves-message-generator / src / test / kotlin / org / onap / dcae / collectors / veshv / ves / message / generator / impl / impl / MessageGeneratorImplTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
21
22 import com.google.protobuf.ByteString
23 import com.google.protobuf.InvalidProtocolBufferException
24 import org.assertj.core.api.Assertions.assertThat
25 import org.assertj.core.api.Assertions.assertThatExceptionOfType
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.onap.dcae.collectors.veshv.domain.ByteData
32 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
34 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
35 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
36 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
39 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
40 import org.onap.ves.VesEventOuterClass.CommonEventHeader
41 import org.onap.ves.VesEventOuterClass.VesEvent
42 import reactor.test.test
43
44 /**
45  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
46  * @since June 2018
47  */
48 object MessageGeneratorImplTest : Spek({
49     describe("message factory") {
50         val generator = MessageGenerator.INSTANCE
51         given("single message parameters") {
52             on("messages amount not specified in parameters") {
53                 it("should create infinite flux") {
54                     val limit = 1000L
55                     generator
56                             .createMessageFlux(listOf(MessageParameters(
57                                     commonHeader(HVMEAS),
58                                     MessageType.VALID
59                             )))
60                             .take(limit)
61                             .test()
62                             .expectNextCount(limit)
63                             .verifyComplete()
64                 }
65             }
66             on("messages amount specified in parameters") {
67                 it("should create message flux of specified size") {
68                     generator
69                             .createMessageFlux(listOf(MessageParameters(
70                                     commonHeader(HVMEAS),
71                                     MessageType.VALID,
72                                     5
73                             )))
74                             .test()
75                             .expectNextCount(5)
76                             .verifyComplete()
77                 }
78             }
79             on("message type requesting valid message") {
80                 it("should create flux of valid messages with given domain") {
81                     generator
82                             .createMessageFlux(listOf(MessageParameters(
83                                     commonHeader(FAULT),
84                                     MessageType.VALID,
85                                     1
86                             )))
87                             .test()
88                             .assertNext {
89                                 assertThat(it.isValid()).isTrue()
90                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
91                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
92                             }
93                             .verifyComplete()
94                 }
95             }
96             on("message type requesting too big payload") {
97                 it("should create flux of messages with given domain and payload exceeding threshold") {
98
99                     generator
100                             .createMessageFlux(listOf(MessageParameters(
101                                     commonHeader(HVMEAS),
102                                     MessageType.TOO_BIG_PAYLOAD,
103                                     1
104                             )))
105                             .test()
106                             .assertNext {
107                                 assertThat(it.isValid()).isTrue()
108                                 assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
109                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
110                             }
111                             .verifyComplete()
112                 }
113             }
114             on("message type requesting invalid GPB data ") {
115                 it("should create flux of messages with invalid payload") {
116                     generator
117                             .createMessageFlux(listOf(MessageParameters(
118                                     commonHeader(HVMEAS),
119                                     MessageType.INVALID_GPB_DATA,
120                                     1
121                             )))
122                             .test()
123                             .assertNext {
124                                 assertThat(it.isValid()).isTrue()
125                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
126                                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
127                                         .isThrownBy { extractCommonEventHeader(it.payload) }
128                             }
129                             .verifyComplete()
130                 }
131             }
132             on("message type requesting invalid wire frame ") {
133                 it("should create flux of messages with invalid version") {
134                     generator
135                             .createMessageFlux(listOf(MessageParameters(
136                                     commonHeader(HVMEAS),
137                                     MessageType.INVALID_WIRE_FRAME,
138                                     1
139                             )))
140                             .test()
141                             .assertNext {
142                                 assertThat(it.isValid()).isFalse()
143                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
144                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
145                                 assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR)
146                             }
147                             .verifyComplete()
148                 }
149             }
150             on("message type requesting fixed payload") {
151                 it("should create flux of valid messages with fixed payload") {
152                     generator
153                             .createMessageFlux(listOf(MessageParameters(
154                                     commonHeader(FAULT),
155                                     MessageType.FIXED_PAYLOAD,
156                                     1
157                             )))
158                             .test()
159                             .assertNext {
160                                 assertThat(it.isValid()).isTrue()
161                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
162                                 assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
163                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
164                             }
165                             .verifyComplete()
166                 }
167             }
168         }
169         given("list of message parameters") {
170             it("should create concatenated flux of messages") {
171                 val singleFluxSize = 5L
172                 val messageParameters = listOf(
173                         MessageParameters(commonHeader(HVMEAS), MessageType.VALID, singleFluxSize),
174                         MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
175                         MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
176                 )
177                 generator.createMessageFlux(messageParameters)
178                         .test()
179                         .assertNext {
180                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
181                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
182                         }
183                         .expectNextCount(singleFluxSize - 1)
184                         .assertNext {
185                             assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
186                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
187                         }
188                         .expectNextCount(singleFluxSize - 1)
189                         .assertNext {
190                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
191                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name)
192                         }
193                         .expectNextCount(singleFluxSize - 1)
194                         .verifyComplete()
195             }
196         }
197     }
198 })
199
200 fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
201         VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
202
203
204 fun extractHvRanMeasFields(bytes: ByteData): ByteString =
205         VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields
206