0bdd1159b234f6b2849e5c7c34885854f17e6fdb
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
21
22 import arrow.core.Either
23 import arrow.core.Left
24 import arrow.core.None
25 import arrow.core.Some
26 import arrow.effects.IO
27 import javax.json.stream.JsonParsingException
28 import com.google.protobuf.ByteString
29 import com.nhaarman.mockito_kotlin.any
30 import com.nhaarman.mockito_kotlin.mock
31 import com.nhaarman.mockito_kotlin.never
32 import com.nhaarman.mockito_kotlin.verify
33 import com.nhaarman.mockito_kotlin.whenever
34 import org.assertj.core.api.Assertions.assertThat
35 import org.assertj.core.api.Assertions.fail
36 import org.jetbrains.spek.api.Spek
37 import org.jetbrains.spek.api.dsl.describe
38 import org.jetbrains.spek.api.dsl.it
39 import org.mockito.ArgumentMatchers.anyList
40 import org.mockito.ArgumentMatchers.anySet
41 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
42 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
43 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
44 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
45 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
46 import org.onap.ves.VesEventV5.VesEvent
47 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
48 import reactor.core.publisher.Flux
49 import java.util.concurrent.ConcurrentLinkedQueue
50 import javax.json.Json
51 import javax.json.JsonArray
52 import javax.json.JsonValue
53
54 /**
55  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
56  * @since August 2018
57  */
58 internal class MessageStreamValidationTest : Spek({
59     lateinit var messageParametersParser: MessageParametersParser
60     lateinit var messageGenerator: MessageGenerator
61     lateinit var cut: MessageStreamValidation
62
63     beforeEachTest {
64         messageParametersParser = mock()
65         messageGenerator = mock()
66         cut = MessageStreamValidation(messageParametersParser, messageGenerator)
67     }
68
69     fun givenParsedMessageParameters(vararg params: MessageParameters) {
70         whenever(messageParametersParser.parse(any())).thenReturn(params.toList())
71     }
72
73     describe("validate") {
74
75         it("should return error when JSON is invalid") {
76             // when
77             val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
78
79             // then
80             when(result) {
81                 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
82                 else -> fail("validation should fail")
83             }
84         }
85
86         it("should return error when message param list is empty") {
87             // given
88             givenParsedMessageParameters()
89
90             // when
91             val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
92
93             // then
94             assertThat(result.isLeft()).isTrue()
95         }
96
97         describe("when validating headers only") {
98             it("should return true when messages are the same") {
99                 // given
100                 val jsonAsStream = sampleJsonAsStream()
101                 val event = vesEvent()
102                 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
103                 val receivedMessageBytes = event.toByteArray()
104
105                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
106                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
107
108                 // when
109                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
110
111                 // then
112                 assertThat(result).isTrue()
113             }
114
115             it("should return true when messages differ with payload only") {
116                 // given
117                 val jsonAsStream = sampleJsonAsStream()
118                 val generatedEvent = vesEvent(payload = "payload A")
119                 val receivedEvent = vesEvent(payload = "payload B")
120                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
121                 val receivedMessageBytes = receivedEvent.toByteArray()
122
123                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
124                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
125
126                 // when
127                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
128
129                 // then
130                 assertThat(result).isTrue()
131             }
132
133             it("should return false when messages are different") {
134                 // given
135                 val jsonAsStream = sampleJsonAsStream()
136                 val generatedEvent = vesEvent()
137                 val receivedEvent = vesEvent(eventId = "bbb")
138                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
139                 val receivedMessageBytes = receivedEvent.toByteArray()
140
141                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
142                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
143
144                 // when
145                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
146
147                 // then
148                 assertThat(result).isFalse()
149             }
150         }
151
152         describe("when validating whole messages") {
153             it("should return true when messages are the same") {
154                 // given
155                 val jsonAsStream = sampleJsonAsStream()
156                 val event = vesEvent()
157                 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
158                 val receivedMessageBytes = event.toByteArray()
159
160                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
161                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
162
163                 // when
164                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
165
166                 // then
167                 assertThat(result).isTrue()
168             }
169
170             it("should return false when messages differ with payload only") {
171                 // given
172                 val jsonAsStream = sampleJsonAsStream()
173                 val generatedEvent = vesEvent(payload = "payload A")
174                 val receivedEvent = vesEvent(payload = "payload B")
175                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
176                 val receivedMessageBytes = receivedEvent.toByteArray()
177
178                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
179                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
180
181                 // when
182                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
183
184                 // then
185                 assertThat(result).isFalse()
186             }
187
188             it("should return false when messages are different") {
189                 // given
190                 val jsonAsStream = sampleJsonAsStream()
191                 val generatedEvent = vesEvent()
192                 val receivedEvent = vesEvent("bbb")
193                 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
194                 val receivedMessageBytes = receivedEvent.toByteArray()
195
196                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
197                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
198
199                 // when
200                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
201
202                 // then
203                 assertThat(result).isFalse()
204             }
205         }
206     }
207 })
208
209
210
211 private const val DUMMY_EVENT_ID = "aaa"
212 private const val DUMMY_PAYLOAD = "payload"
213
214 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
215     return VesEvent.newBuilder()
216             .setCommonEventHeader(CommonEventHeader.newBuilder()
217                     .setEventId(eventId))
218             .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
219             .build()
220 }
221
222 private const val sampleJsonArray = """["headersOnly"]"""
223
224 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()