2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Right
23 import com.google.protobuf.ByteString
24 import com.nhaarman.mockitokotlin2.any
25 import com.nhaarman.mockitokotlin2.mock
26 import com.nhaarman.mockitokotlin2.whenever
27 import org.assertj.core.api.Assertions.assertThat
28 import org.jetbrains.spek.api.Spek
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.it
31 import org.onap.dcae.collectors.veshv.tests.utils.assertFailedWithError
32 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
33 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
34 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
35 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
36 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
37 import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
38 import org.onap.ves.VesEventOuterClass.CommonEventHeader
39 import org.onap.ves.VesEventOuterClass.VesEvent
40 import reactor.core.publisher.Flux
41 import javax.json.stream.JsonParsingException
44 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
47 internal class MessageStreamValidationTest : Spek({
48 lateinit var messageParametersParser: MessageParametersParser
49 lateinit var messageGenerator: VesEventGenerator
50 lateinit var cut: MessageStreamValidation
53 messageParametersParser = mock()
54 messageGenerator = mock()
55 cut = MessageStreamValidation(messageGenerator, messageParametersParser)
58 fun givenParsedMessageParameters(vararg params: MessageParameters) {
59 whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
62 describe("validate") {
64 it("should return error when JSON is invalid") {
66 val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
69 result.assertFailedWithError { it is JsonParsingException }
72 it("should return error when message param list is empty") {
74 givenParsedMessageParameters()
77 val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
80 result.assertFailedWithError { it is IllegalArgumentException }
83 describe("when validating headers only") {
84 it("should return true when messages are the same") {
86 val jsonAsStream = sampleJsonAsStream()
87 val event = vesEvent()
88 val receivedMessageBytes = event.toByteArray()
90 givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, VALID, 1))
91 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event))
94 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
97 assertThat(result).isTrue()
100 it("should return true when messages differ with payload only") {
102 val jsonAsStream = sampleJsonAsStream()
103 val generatedEvent = vesEvent(payload = "payload A")
104 val receivedEvent = vesEvent(payload = "payload B")
106 val receivedMessageBytes = receivedEvent.toByteArray()
108 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1))
109 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
112 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
115 assertThat(result).isTrue()
118 it("should return false when messages are different") {
120 val jsonAsStream = sampleJsonAsStream()
121 val generatedEvent = vesEvent()
122 val receivedEvent = vesEvent(eventId = "bbb")
123 val receivedMessageBytes = receivedEvent.toByteArray()
125 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1))
126 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
129 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
132 assertThat(result).isFalse()
136 describe("when validating whole messages") {
137 it("should return true when messages are the same") {
139 val jsonAsStream = sampleJsonAsStream()
140 val event = vesEvent()
141 val receivedMessageBytes = event.toByteArray()
143 givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, FIXED_PAYLOAD, 1))
144 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event))
147 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
150 assertThat(result).isTrue()
153 it("should return false when messages differ with payload only") {
155 val jsonAsStream = sampleJsonAsStream()
156 val generatedEvent = vesEvent(payload = "payload A")
157 val receivedEvent = vesEvent(payload = "payload B")
158 val receivedMessageBytes = receivedEvent.toByteArray()
160 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1))
161 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
164 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
167 assertThat(result).isFalse()
170 it("should return false when messages are different") {
172 val jsonAsStream = sampleJsonAsStream()
173 val generatedEvent = vesEvent()
174 val receivedEvent = vesEvent("bbb")
175 val receivedMessageBytes = receivedEvent.toByteArray()
177 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1))
178 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
181 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
184 assertThat(result).isFalse()
191 private const val DUMMY_EVENT_ID = "aaa"
192 private const val DUMMY_PAYLOAD = "payload"
193 private const val sampleJsonArray = """["headersOnly"]"""
195 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
196 return VesEvent.newBuilder()
197 .setCommonEventHeader(CommonEventHeader.newBuilder()
198 .setEventId(eventId))
199 .setEventFields(ByteString.copyFrom(payload.toByteArray()))
203 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()