2932367b75785052aa68f0a92344ae34167a230c
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
21
22 import arrow.core.Either
23 import arrow.core.Left
24 import arrow.core.None
25 import arrow.core.Right
26 import arrow.core.Some
27 import arrow.effects.IO
28 import javax.json.stream.JsonParsingException
29 import com.google.protobuf.ByteString
30 import com.nhaarman.mockito_kotlin.any
31 import com.nhaarman.mockito_kotlin.mock
32 import com.nhaarman.mockito_kotlin.never
33 import com.nhaarman.mockito_kotlin.verify
34 import com.nhaarman.mockito_kotlin.whenever
35 import org.assertj.core.api.Assertions.assertThat
36 import org.assertj.core.api.Assertions.fail
37 import org.jetbrains.spek.api.Spek
38 import org.jetbrains.spek.api.dsl.describe
39 import org.jetbrains.spek.api.dsl.it
40 import org.mockito.ArgumentMatchers.anyList
41 import org.mockito.ArgumentMatchers.anySet
42 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
43 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
44 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
45 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
46 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
47 import org.onap.ves.VesEventV5.VesEvent
48 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
49 import reactor.core.publisher.Flux
50 import java.util.concurrent.ConcurrentLinkedQueue
51 import javax.json.Json
52 import javax.json.JsonArray
53 import javax.json.JsonValue
54
55 /**
56  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
57  * @since August 2018
58  */
59 internal class MessageStreamValidationTest : Spek({
60     lateinit var messageParametersParser: MessageParametersParser
61     lateinit var messageGenerator: MessageGenerator
62     lateinit var cut: MessageStreamValidation
63
64     beforeEachTest {
65         messageParametersParser = mock()
66         messageGenerator = mock()
67         cut = MessageStreamValidation(messageParametersParser, messageGenerator)
68     }
69
70     fun givenParsedMessageParameters(vararg params: MessageParameters) {
71         whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
72     }
73
74     describe("validate") {
75
76         it("should return error when JSON is invalid") {
77             // when
78             val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
79
80             // then
81             when(result) {
82                 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
83                 else -> fail("validation should fail")
84             }
85         }
86
87         it("should return error when message param list is empty") {
88             // given
89             givenParsedMessageParameters()
90
91             // when
92             val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
93
94             // then
95             assertThat(result.isLeft()).isTrue()
96         }
97
98         describe("when validating headers only") {
99             it("should return true when messages are the same") {
100                 // given
101                 val jsonAsStream = sampleJsonAsStream()
102                 val event = vesEvent()
103                 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
104                 val receivedMessageBytes = event.toByteArray()
105
106                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
107                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
108
109                 // when
110                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
111
112                 // then
113                 assertThat(result).isTrue()
114             }
115
116             it("should return true when messages differ with payload only") {
117                 // given
118                 val jsonAsStream = sampleJsonAsStream()
119                 val generatedEvent = vesEvent(payload = "payload A")
120                 val receivedEvent = vesEvent(payload = "payload B")
121                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
122                 val receivedMessageBytes = receivedEvent.toByteArray()
123
124                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
125                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
126
127                 // when
128                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
129
130                 // then
131                 assertThat(result).isTrue()
132             }
133
134             it("should return false when messages are different") {
135                 // given
136                 val jsonAsStream = sampleJsonAsStream()
137                 val generatedEvent = vesEvent()
138                 val receivedEvent = vesEvent(eventId = "bbb")
139                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
140                 val receivedMessageBytes = receivedEvent.toByteArray()
141
142                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
143                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
144
145                 // when
146                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
147
148                 // then
149                 assertThat(result).isFalse()
150             }
151         }
152
153         describe("when validating whole messages") {
154             it("should return true when messages are the same") {
155                 // given
156                 val jsonAsStream = sampleJsonAsStream()
157                 val event = vesEvent()
158                 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
159                 val receivedMessageBytes = event.toByteArray()
160
161                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
162                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
163
164                 // when
165                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
166
167                 // then
168                 assertThat(result).isTrue()
169             }
170
171             it("should return false when messages differ with payload only") {
172                 // given
173                 val jsonAsStream = sampleJsonAsStream()
174                 val generatedEvent = vesEvent(payload = "payload A")
175                 val receivedEvent = vesEvent(payload = "payload B")
176                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
177                 val receivedMessageBytes = receivedEvent.toByteArray()
178
179                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
180                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
181
182                 // when
183                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
184
185                 // then
186                 assertThat(result).isFalse()
187             }
188
189             it("should return false when messages are different") {
190                 // given
191                 val jsonAsStream = sampleJsonAsStream()
192                 val generatedEvent = vesEvent()
193                 val receivedEvent = vesEvent("bbb")
194                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
195                 val receivedMessageBytes = receivedEvent.toByteArray()
196
197                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
198                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
199
200                 // when
201                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
202
203                 // then
204                 assertThat(result).isFalse()
205             }
206         }
207     }
208 })
209
210
211
212 private const val DUMMY_EVENT_ID = "aaa"
213 private const val DUMMY_PAYLOAD = "payload"
214
215 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
216     return VesEvent.newBuilder()
217             .setCommonEventHeader(CommonEventHeader.newBuilder()
218                     .setEventId(eventId))
219             .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
220             .build()
221 }
222
223 private const val sampleJsonArray = """["headersOnly"]"""
224
225 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()