930f020b252e4aed45ca0a3e4dcf792d02e0e2e9
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.ves.message.generator.impl
21
22 import com.google.protobuf.ByteString
23 import com.google.protobuf.InvalidProtocolBufferException
24 import org.assertj.core.api.Assertions.assertThat
25 import org.assertj.core.api.Assertions.assertThatExceptionOfType
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.onap.dcae.collectors.veshv.domain.ByteData
32 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
34 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
35 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
36 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
39 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
40 import org.onap.ves.VesEventOuterClass.CommonEventHeader
41 import org.onap.ves.VesEventOuterClass.VesEvent
42 import reactor.test.test
43 import kotlin.test.assertTrue
44
45 /**
46  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
47  * @since June 2018
48  */
49 object MessageGeneratorImplTest : Spek({
50     describe("message factory") {
51         val maxPayloadSizeBytes = 1024
52         val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
53         given("single message parameters") {
54
55             on("messages amount not specified in parameters") {
56                 it("should create infinite flux") {
57                     val limit = 1000L
58                     generator
59                             .createMessageFlux(listOf(MessageParameters(
60                                     commonHeader(PERF3GPP),
61                                     MessageType.VALID
62                             )))
63                             .take(limit)
64                             .test()
65                             .expectNextCount(limit)
66                             .verifyComplete()
67                 }
68             }
69
70             on("messages amount = 0 specified in parameters") {
71                 it("should create empty message flux") {
72                     generator
73                             .createMessageFlux(listOf(MessageParameters(
74                                     commonHeader(PERF3GPP),
75                                     MessageType.VALID,
76                                     0
77                             )))
78                             .test()
79                             .verifyComplete()
80                 }
81             }
82
83             on("messages amount specified in parameters") {
84                 it("should create message flux of specified size") {
85                     generator
86                             .createMessageFlux(listOf(MessageParameters(
87                                     commonHeader(PERF3GPP),
88                                     MessageType.VALID,
89                                     5
90                             )))
91                             .test()
92                             .expectNextCount(5)
93                             .verifyComplete()
94                 }
95             }
96
97             on("message type requesting valid message") {
98                 it("should create flux of valid messages with given domain") {
99                     generator
100                             .createMessageFlux(listOf(MessageParameters(
101                                     commonHeader(FAULT),
102                                     MessageType.VALID,
103                                     1
104                             )))
105                             .test()
106                             .assertNext {
107                                 assertTrue(it.validate().isRight())
108                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
109                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
110                             }
111                             .verifyComplete()
112                 }
113             }
114
115             on("message type requesting too big payload") {
116                 it("should create flux of messages with given domain and payload exceeding threshold") {
117
118                     generator
119                             .createMessageFlux(listOf(MessageParameters(
120                                     commonHeader(PERF3GPP),
121                                     MessageType.TOO_BIG_PAYLOAD,
122                                     1
123                             )))
124                             .test()
125                             .assertNext {
126                                 assertTrue(it.validate().isRight())
127                                 assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
128                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
129                             }
130                             .verifyComplete()
131                 }
132             }
133
134             on("message type requesting invalid GPB data ") {
135                 it("should create flux of messages with invalid payload") {
136                     generator
137                             .createMessageFlux(listOf(MessageParameters(
138                                     commonHeader(PERF3GPP),
139                                     MessageType.INVALID_GPB_DATA,
140                                     1
141                             )))
142                             .test()
143                             .assertNext {
144                                 assertTrue(it.validate().isRight())
145                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
146                                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
147                                         .isThrownBy { extractCommonEventHeader(it.payload) }
148                             }
149                             .verifyComplete()
150                 }
151             }
152
153             on("message type requesting invalid wire frame ") {
154                 it("should create flux of messages with invalid version") {
155                     generator
156                             .createMessageFlux(listOf(MessageParameters(
157                                     commonHeader(PERF3GPP),
158                                     MessageType.INVALID_WIRE_FRAME,
159                                     1
160                             )))
161                             .test()
162                             .assertNext {
163                                 assertTrue(it.validate().isLeft())
164                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
165                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
166                                 assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
167                             }
168                             .verifyComplete()
169                 }
170             }
171
172             on("message type requesting fixed payload") {
173                 it("should create flux of valid messages with fixed payload") {
174                     generator
175                             .createMessageFlux(listOf(MessageParameters(
176                                     commonHeader(FAULT),
177                                     MessageType.FIXED_PAYLOAD,
178                                     1
179                             )))
180                             .test()
181                             .assertNext {
182                                 assertTrue(it.validate().isRight())
183                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
184                                 assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
185                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
186                             }
187                             .verifyComplete()
188                 }
189             }
190         }
191         given("list of message parameters") {
192             it("should create concatenated flux of messages") {
193                 val singleFluxSize = 5L
194                 val messageParameters = listOf(
195                         MessageParameters(commonHeader(PERF3GPP), MessageType.VALID, singleFluxSize),
196                         MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
197                         MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
198                 )
199                 generator.createMessageFlux(messageParameters)
200                         .test()
201                         .assertNext {
202                             assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
203                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
204                         }
205                         .expectNextCount(singleFluxSize - 1)
206                         .assertNext {
207                             assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
208                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
209                         }
210                         .expectNextCount(singleFluxSize - 1)
211                         .assertNext {
212                             assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
213                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName)
214                         }
215                         .expectNextCount(singleFluxSize - 1)
216                         .verifyComplete()
217             }
218         }
219     }
220 })
221
222 fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
223         VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
224
225
226 fun extractEventFields(bytes: ByteData): ByteString =
227         VesEvent.parseFrom(bytes.unsafeAsArray()).eventFields
228