beef26b661222dae846bea00e45459553e08d04e
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
21
22 import arrow.core.Either
23 import arrow.core.Right
24 import com.google.protobuf.ByteString
25 import com.nhaarman.mockito_kotlin.any
26 import com.nhaarman.mockito_kotlin.mock
27 import com.nhaarman.mockito_kotlin.whenever
28 import org.assertj.core.api.Assertions.assertThat
29 import org.assertj.core.api.Assertions.fail
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.it
33 import org.mockito.ArgumentMatchers.anyList
34 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
35 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
36 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
39 import org.onap.ves.VesEventOuterClass.CommonEventHeader
40 import org.onap.ves.VesEventOuterClass.VesEvent
41 import reactor.core.publisher.Flux
42 import javax.json.stream.JsonParsingException
43
44 /**
45  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
46  * @since August 2018
47  */
48 internal class MessageStreamValidationTest : Spek({
49     lateinit var messageParametersParser: MessageParametersParser
50     lateinit var messageGenerator: MessageGenerator
51     lateinit var cut: MessageStreamValidation
52
53     beforeEachTest {
54         messageParametersParser = mock()
55         messageGenerator = mock()
56         cut = MessageStreamValidation(messageParametersParser, messageGenerator)
57     }
58
59     fun givenParsedMessageParameters(vararg params: MessageParameters) {
60         whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
61     }
62
63     describe("validate") {
64
65         it("should return error when JSON is invalid") {
66             // when
67             val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
68
69             // then
70             when(result) {
71                 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
72                 else -> fail("validation should fail")
73             }
74         }
75
76         it("should return error when message param list is empty") {
77             // given
78             givenParsedMessageParameters()
79
80             // when
81             val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
82
83             // then
84             assertThat(result.isLeft()).isTrue()
85         }
86
87         describe("when validating headers only") {
88             it("should return true when messages are the same") {
89                 // given
90                 val jsonAsStream = sampleJsonAsStream()
91                 val event = vesEvent()
92                 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
93                 val receivedMessageBytes = event.toByteArray()
94
95                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
96                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
97
98                 // when
99                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
100
101                 // then
102                 assertThat(result).isTrue()
103             }
104
105             it("should return true when messages differ with payload only") {
106                 // given
107                 val jsonAsStream = sampleJsonAsStream()
108                 val generatedEvent = vesEvent(payload = "payload A")
109                 val receivedEvent = vesEvent(payload = "payload B")
110                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
111                 val receivedMessageBytes = receivedEvent.toByteArray()
112
113                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
114                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
115
116                 // when
117                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
118
119                 // then
120                 assertThat(result).isTrue()
121             }
122
123             it("should return false when messages are different") {
124                 // given
125                 val jsonAsStream = sampleJsonAsStream()
126                 val generatedEvent = vesEvent()
127                 val receivedEvent = vesEvent(eventId = "bbb")
128                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
129                 val receivedMessageBytes = receivedEvent.toByteArray()
130
131                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
132                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
133
134                 // when
135                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
136
137                 // then
138                 assertThat(result).isFalse()
139             }
140         }
141
142         describe("when validating whole messages") {
143             it("should return true when messages are the same") {
144                 // given
145                 val jsonAsStream = sampleJsonAsStream()
146                 val event = vesEvent()
147                 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
148                 val receivedMessageBytes = event.toByteArray()
149
150                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
151                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
152
153                 // when
154                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
155
156                 // then
157                 assertThat(result).isTrue()
158             }
159
160             it("should return false when messages differ with payload only") {
161                 // given
162                 val jsonAsStream = sampleJsonAsStream()
163                 val generatedEvent = vesEvent(payload = "payload A")
164                 val receivedEvent = vesEvent(payload = "payload B")
165                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
166                 val receivedMessageBytes = receivedEvent.toByteArray()
167
168                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
169                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
170
171                 // when
172                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
173
174                 // then
175                 assertThat(result).isFalse()
176             }
177
178             it("should return false when messages are different") {
179                 // given
180                 val jsonAsStream = sampleJsonAsStream()
181                 val generatedEvent = vesEvent()
182                 val receivedEvent = vesEvent("bbb")
183                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
184                 val receivedMessageBytes = receivedEvent.toByteArray()
185
186                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
187                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
188
189                 // when
190                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
191
192                 // then
193                 assertThat(result).isFalse()
194             }
195         }
196     }
197 })
198
199
200
201 private const val DUMMY_EVENT_ID = "aaa"
202 private const val DUMMY_PAYLOAD = "payload"
203
204 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
205     return VesEvent.newBuilder()
206             .setCommonEventHeader(CommonEventHeader.newBuilder()
207                     .setEventId(eventId))
208             .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
209             .build()
210 }
211
212 private const val sampleJsonArray = """["headersOnly"]"""
213
214 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()