b2490dd15e0592eaa52de3dd564c95a8e08e8efc
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
21
22 import com.google.protobuf.ByteString
23 import com.google.protobuf.InvalidProtocolBufferException
24 import org.assertj.core.api.Assertions.assertThat
25 import org.assertj.core.api.Assertions.assertThatExceptionOfType
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.onap.dcae.collectors.veshv.domain.ByteData
32 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
33 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
34 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
35 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
36 import org.onap.ves.VesEventV5.VesEvent
37 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
38 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT
39 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT
40 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
41 import reactor.test.test
42
43 /**
44  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
45  * @since June 2018
46  */
47 object MessageGeneratorImplTest : Spek({
48     describe("message factory") {
49         val generator = MessageGenerator.INSTANCE
50         given("single message parameters") {
51             on("messages amount not specified in parameters") {
52                 it("should create infinite flux") {
53                     val limit = 1000L
54                     generator
55                             .createMessageFlux(listOf(MessageParameters(
56                                     createSampleCommonHeader(HVRANMEAS),
57                                     MessageType.VALID
58                             )))
59                             .take(limit)
60                             .test()
61                             .expectNextCount(limit)
62                             .verifyComplete()
63                 }
64             }
65             on("messages amount specified in parameters") {
66                 it("should create message flux of specified size") {
67                     generator
68                             .createMessageFlux(listOf(MessageParameters(
69                                     createSampleCommonHeader(HVRANMEAS),
70                                     MessageType.VALID,
71                                     5
72                             )))
73                             .test()
74                             .expectNextCount(5)
75                             .verifyComplete()
76                 }
77             }
78             on("message type requesting valid message") {
79                 it("should create flux of valid messages with given domain") {
80                     generator
81                             .createMessageFlux(listOf(MessageParameters(
82                                     createSampleCommonHeader(FAULT),
83                                     MessageType.VALID,
84                                     1
85                             )))
86                             .test()
87                             .assertNext {
88                                 assertThat(it.isValid()).isTrue()
89                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
90                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
91                             }
92                             .verifyComplete()
93                 }
94             }
95             on("message type requesting too big payload") {
96                 it("should create flux of messages with given domain and payload exceeding threshold") {
97
98                     generator
99                             .createMessageFlux(listOf(MessageParameters(
100                                     createSampleCommonHeader(HVRANMEAS),
101                                     MessageType.TOO_BIG_PAYLOAD,
102                                     1
103                             )))
104                             .test()
105                             .assertNext {
106                                 assertThat(it.isValid()).isTrue()
107                                 assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
108                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
109                             }
110                             .verifyComplete()
111                 }
112             }
113             on("message type requesting invalid GPB data ") {
114                 it("should create flux of messages with invalid payload") {
115                     generator
116                             .createMessageFlux(listOf(MessageParameters(
117                                     createSampleCommonHeader(HVRANMEAS),
118                                     MessageType.INVALID_GPB_DATA,
119                                     1
120                             )))
121                             .test()
122                             .assertNext {
123                                 assertThat(it.isValid()).isTrue()
124                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
125                                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
126                                         .isThrownBy { extractCommonEventHeader(it.payload) }
127                             }
128                             .verifyComplete()
129                 }
130             }
131             on("message type requesting invalid wire frame ") {
132                 it("should create flux of messages with invalid version") {
133                     generator
134                             .createMessageFlux(listOf(MessageParameters(
135                                     createSampleCommonHeader(HVRANMEAS),
136                                     MessageType.INVALID_WIRE_FRAME,
137                                     1
138                             )))
139                             .test()
140                             .assertNext {
141                                 assertThat(it.isValid()).isFalse()
142                                 assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
143                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
144                                 assertThat(it.version).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION)
145                             }
146                             .verifyComplete()
147                 }
148             }
149         }
150         given("list of message parameters") {
151             it("should create concatenated flux of messages") {
152                 val singleFluxSize = 5L
153                 val messageParameters = listOf(
154                         MessageParameters(createSampleCommonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize),
155                         MessageParameters(createSampleCommonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
156                         MessageParameters(createSampleCommonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
157                 )
158                 generator.createMessageFlux(messageParameters)
159                         .test()
160                         .assertNext {
161                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
162                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
163                         }
164                         .expectNextCount(singleFluxSize - 1)
165                         .assertNext {
166                             assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
167                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
168                         }
169                         .expectNextCount(singleFluxSize - 1)
170                         .assertNext {
171                             assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
172                             assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
173                         }
174                         .expectNextCount(singleFluxSize - 1)
175                         .verifyComplete()
176             }
177         }
178     }
179 })
180
181 fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
182     return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
183 }
184
185 private fun createSampleCommonHeader(domain: CommonEventHeader.Domain): CommonEventHeader = CommonEventHeader.newBuilder()
186         .setVersion("sample-version")
187         .setDomain(domain)
188         .setSequence(1)
189         .setPriority(CommonEventHeader.Priority.NORMAL)
190         .setEventId("sample-event-id")
191         .setEventName("sample-event-name")
192         .setEventType("sample-event-type")
193         .setStartEpochMicrosec(120034455)
194         .setLastEpochMicrosec(120034455)
195         .setNfNamingCode("sample-nf-naming-code")
196         .setNfcNamingCode("sample-nfc-naming-code")
197         .setReportingEntityId("sample-reporting-entity-id")
198         .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
199         .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
200         .setSourceName("sample-source-name")
201         .build()
202
203