2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Either
23 import arrow.core.Right
24 import com.google.protobuf.ByteString
25 import com.nhaarman.mockito_kotlin.any
26 import com.nhaarman.mockito_kotlin.mock
27 import com.nhaarman.mockito_kotlin.whenever
28 import org.assertj.core.api.Assertions.assertThat
29 import org.assertj.core.api.Assertions.fail
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.it
33 import org.mockito.ArgumentMatchers.anyList
34 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
35 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
36 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
39 import org.onap.ves.VesEventOuterClass.CommonEventHeader
40 import org.onap.ves.VesEventOuterClass.VesEvent
41 import reactor.core.publisher.Flux
42 import javax.json.stream.JsonParsingException
45 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
48 internal class MessageStreamValidationTest : Spek({
49 lateinit var messageParametersParser: MessageParametersParser
50 lateinit var messageGenerator: MessageGenerator
51 lateinit var cut: MessageStreamValidation
54 messageParametersParser = mock()
55 messageGenerator = mock()
56 cut = MessageStreamValidation(messageParametersParser, messageGenerator)
59 fun givenParsedMessageParameters(vararg params: MessageParameters) {
60 whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
63 describe("validate") {
65 it("should return error when JSON is invalid") {
67 val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
71 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
72 else -> fail("validation should fail")
76 it("should return error when message param list is empty") {
78 givenParsedMessageParameters()
81 val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
84 assertThat(result.isLeft()).isTrue()
87 describe("when validating headers only") {
88 it("should return true when messages are the same") {
90 val jsonAsStream = sampleJsonAsStream()
91 val event = vesEvent()
92 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
93 val receivedMessageBytes = event.toByteArray()
95 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
96 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
99 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
102 assertThat(result).isTrue()
105 it("should return true when messages differ with payload only") {
107 val jsonAsStream = sampleJsonAsStream()
108 val generatedEvent = vesEvent(payload = "payload A")
109 val receivedEvent = vesEvent(payload = "payload B")
110 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
111 val receivedMessageBytes = receivedEvent.toByteArray()
113 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
114 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
117 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
120 assertThat(result).isTrue()
123 it("should return false when messages are different") {
125 val jsonAsStream = sampleJsonAsStream()
126 val generatedEvent = vesEvent()
127 val receivedEvent = vesEvent(eventId = "bbb")
128 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
129 val receivedMessageBytes = receivedEvent.toByteArray()
131 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
132 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
135 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
138 assertThat(result).isFalse()
142 describe("when validating whole messages") {
143 it("should return true when messages are the same") {
145 val jsonAsStream = sampleJsonAsStream()
146 val event = vesEvent()
147 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
148 val receivedMessageBytes = event.toByteArray()
150 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
151 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
154 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
157 assertThat(result).isTrue()
160 it("should return false when messages differ with payload only") {
162 val jsonAsStream = sampleJsonAsStream()
163 val generatedEvent = vesEvent(payload = "payload A")
164 val receivedEvent = vesEvent(payload = "payload B")
165 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
166 val receivedMessageBytes = receivedEvent.toByteArray()
168 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
169 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
172 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
175 assertThat(result).isFalse()
178 it("should return false when messages are different") {
180 val jsonAsStream = sampleJsonAsStream()
181 val generatedEvent = vesEvent()
182 val receivedEvent = vesEvent("bbb")
183 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
184 val receivedMessageBytes = receivedEvent.toByteArray()
186 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
187 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
190 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
193 assertThat(result).isFalse()
201 private const val DUMMY_EVENT_ID = "aaa"
202 private const val DUMMY_PAYLOAD = "payload"
204 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
205 return VesEvent.newBuilder()
206 .setCommonEventHeader(CommonEventHeader.newBuilder()
207 .setEventId(eventId))
208 .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
212 private const val sampleJsonArray = """["headersOnly"]"""
214 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()