fb144616a05e40abd1ec89fe3b619a8c8c954a19
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
21
22 import com.google.protobuf.InvalidProtocolBufferException
23 import org.assertj.core.api.Assertions.assertThat
24 import org.assertj.core.api.Assertions.assertThatExceptionOfType
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.describe
27 import org.jetbrains.spek.api.dsl.given
28 import org.jetbrains.spek.api.dsl.it
29 import org.jetbrains.spek.api.dsl.on
30 import org.onap.dcae.collectors.veshv.domain.ByteData
31 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
32 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
33 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
34 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
35 import org.onap.ves.VesEventV5.VesEvent
36 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
37 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.*
38 import reactor.test.test
39
40 /**
41  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
42  * @since June 2018
43  */
44 object MessageGeneratorImplTest : Spek({
45     describe("message factory") {
46         val generator = MessageGenerator.INSTANCE
47         given("single message parameters") {
48             on("messages amount not specified in parameters") {
49                 it("should create infinite flux") {
50                     val limit = 1000L
51                     generator
52                             .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID)))
53                             .take(limit)
54                             .test()
55                             .expectNextCount(limit)
56                             .verifyComplete()
57                 }
58             }
59             on("messages amount specified in parameters") {
60                 it("should create message flux of specified size") {
61                     generator
62                             .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 5)))
63                             .test()
64                             .expectNextCount(5)
65                             .verifyComplete()
66                 }
67             }
68             on("message type requesting valid message") {
69                 it("should create flux of valid messages with given domain") {
70                     generator
71                             .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.VALID, 1)))
72                             .test()
73                             .assertNext {
74                                 assertThat(it.isValid()).isTrue()
75                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
76                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
77                             }
78                             .verifyComplete()
79                 }
80             }
81             on("message type requesting too big payload") {
82                 it("should create flux of messages with given domain and payload exceeding threshold") {
83
84                     generator
85                             .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.TOO_BIG_PAYLOAD, 1)))
86                             .test()
87                             .assertNext {
88                                 assertThat(it.isValid()).isTrue()
89                                 assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
90                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
91                             }
92                             .verifyComplete()
93                 }
94             }
95             on("message type requesting unsupported domain") {
96                 it("should create flux of messages with domain other than HVRANMEAS") {
97
98                     generator
99                             .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.UNSUPPORTED_DOMAIN, 1)))
100                             .test()
101                             .assertNext {
102                                 assertThat(it.isValid()).isTrue()
103                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
104                                 assertThat(extractCommonEventHeader(it.payload).domain).isNotEqualTo(HVRANMEAS)
105                             }
106                             .verifyComplete()
107                 }
108             }
109             on("message type requesting invalid GPB data ") {
110                 it("should create flux of messages with invalid payload") {
111                     generator
112                             .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_GPB_DATA, 1)))
113                             .test()
114                             .assertNext {
115                                 assertThat(it.isValid()).isTrue()
116                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
117                                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
118                                         .isThrownBy { extractCommonEventHeader(it.payload) }
119                             }
120                             .verifyComplete()
121                 }
122             }
123             on("message type requesting invalid wire frame ") {
124                 it("should create flux of messages with invalid version") {
125                     generator
126                             .createMessageFlux(listOf(MessageParameters(HVRANMEAS, MessageType.INVALID_WIRE_FRAME, 1)))
127                             .test()
128                             .assertNext {
129                                 assertThat(it.isValid()).isFalse()
130                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
131                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
132                                 assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION)
133                             }
134                             .verifyComplete()
135                 }
136             }
137         }
138         given("list of message parameters") {
139             it("should create concatenated flux of messages") {
140                 val singleFluxSize = 5L
141                 val messageParameters = listOf(
142                         MessageParameters(HVRANMEAS, MessageType.VALID, singleFluxSize),
143                         MessageParameters(FAULT, MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
144                         MessageParameters(HEARTBEAT, MessageType.VALID, singleFluxSize)
145                 )
146                 generator.createMessageFlux(messageParameters)
147                         .test()
148                         .assertNext {
149                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
150                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
151                         }
152                         .expectNextCount(singleFluxSize - 1)
153                         .assertNext {
154                             assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
155                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
156                         }
157                         .expectNextCount(singleFluxSize - 1)
158                         .assertNext {
159                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
160                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
161                         }
162                         .expectNextCount(singleFluxSize - 1)
163                         .verifyComplete()
164             }
165         }
166     }
167 })
168
169 fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
170     return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
171 }