2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Either
23 import arrow.core.Right
24 import com.google.protobuf.ByteString
25 import com.nhaarman.mockitokotlin2.any
26 import com.nhaarman.mockitokotlin2.mock
27 import com.nhaarman.mockitokotlin2.whenever
28 import org.assertj.core.api.Assertions.assertThat
29 import org.assertj.core.api.Assertions.fail
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.it
33 import org.mockito.ArgumentMatchers.anyList
34 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
35 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
36 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
39 import org.onap.ves.VesEventOuterClass.CommonEventHeader
40 import org.onap.ves.VesEventOuterClass.VesEvent
41 import reactor.core.publisher.Flux
42 import javax.json.stream.JsonParsingException
45 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
48 internal class MessageStreamValidationTest : Spek({
49 lateinit var messageParametersParser: MessageParametersParser
50 lateinit var messageGenerator: MessageGenerator
51 lateinit var cut: MessageStreamValidation
54 messageParametersParser = mock()
55 messageGenerator = mock()
56 cut = MessageStreamValidation(messageGenerator, messageParametersParser)
59 fun givenParsedMessageParameters(vararg params: MessageParameters) {
60 whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
63 describe("validate") {
65 it("should return error when JSON is invalid") {
67 val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
71 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
72 else -> fail("validation should fail")
76 it("should return error when message param list is empty") {
78 givenParsedMessageParameters()
81 val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
84 assertThat(result.isLeft()).isTrue()
87 describe("when validating headers only") {
88 it("should return true when messages are the same") {
90 val jsonAsStream = sampleJsonAsStream()
91 val event = vesEvent()
92 val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
93 val receivedMessageBytes = event.toByteArray()
95 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
96 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
99 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
102 assertThat(result).isTrue()
105 it("should return true when messages differ with payload only") {
107 val jsonAsStream = sampleJsonAsStream()
108 val generatedEvent = vesEvent(payload = "payload A")
109 val receivedEvent = vesEvent(payload = "payload B")
110 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
111 val receivedMessageBytes = receivedEvent.toByteArray()
113 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
114 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
117 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
120 assertThat(result).isTrue()
123 it("should return false when messages are different") {
125 val jsonAsStream = sampleJsonAsStream()
126 val generatedEvent = vesEvent()
127 val receivedEvent = vesEvent(eventId = "bbb")
128 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
129 val receivedMessageBytes = receivedEvent.toByteArray()
131 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
132 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
135 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
138 assertThat(result).isFalse()
142 describe("when validating whole messages") {
143 it("should return true when messages are the same") {
145 val jsonAsStream = sampleJsonAsStream()
146 val event = vesEvent()
147 val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
148 val receivedMessageBytes = event.toByteArray()
150 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
151 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
154 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
157 assertThat(result).isTrue()
160 it("should return false when messages differ with payload only") {
162 val jsonAsStream = sampleJsonAsStream()
163 val generatedEvent = vesEvent(payload = "payload A")
164 val receivedEvent = vesEvent(payload = "payload B")
165 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
166 val receivedMessageBytes = receivedEvent.toByteArray()
168 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
169 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
172 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
175 assertThat(result).isFalse()
178 it("should return false when messages are different") {
180 val jsonAsStream = sampleJsonAsStream()
181 val generatedEvent = vesEvent()
182 val receivedEvent = vesEvent("bbb")
183 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
184 val receivedMessageBytes = receivedEvent.toByteArray()
186 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
187 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
190 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
193 assertThat(result).isFalse()
201 private const val DUMMY_EVENT_ID = "aaa"
202 private const val DUMMY_PAYLOAD = "payload"
204 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
205 return VesEvent.newBuilder()
206 .setCommonEventHeader(CommonEventHeader.newBuilder()
207 .setEventId(eventId))
208 .setEventFields(ByteString.copyFrom(payload.toByteArray()))
212 private const val sampleJsonArray = """["headersOnly"]"""
214 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()