2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Either
23 import arrow.core.Left
24 import arrow.core.None
25 import arrow.core.Some
26 import arrow.effects.IO
27 import javax.json.stream.JsonParsingException
28 import com.google.protobuf.ByteString
29 import com.nhaarman.mockito_kotlin.any
30 import com.nhaarman.mockito_kotlin.mock
31 import com.nhaarman.mockito_kotlin.never
32 import com.nhaarman.mockito_kotlin.verify
33 import com.nhaarman.mockito_kotlin.whenever
34 import org.assertj.core.api.Assertions.assertThat
35 import org.assertj.core.api.Assertions.fail
36 import org.jetbrains.spek.api.Spek
37 import org.jetbrains.spek.api.dsl.describe
38 import org.jetbrains.spek.api.dsl.it
39 import org.mockito.ArgumentMatchers.anyList
40 import org.mockito.ArgumentMatchers.anySet
41 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
42 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
43 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
44 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
45 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
46 import org.onap.ves.VesEventV5.VesEvent
47 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
48 import reactor.core.publisher.Flux
49 import java.util.concurrent.ConcurrentLinkedQueue
50 import javax.json.Json
51 import javax.json.JsonArray
52 import javax.json.JsonValue
55 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
58 internal class MessageStreamValidationTest : Spek({
59 lateinit var messageParametersParser: MessageParametersParser
60 lateinit var messageGenerator: MessageGenerator
61 lateinit var cut: MessageStreamValidation
64 messageParametersParser = mock()
65 messageGenerator = mock()
66 cut = MessageStreamValidation(messageParametersParser, messageGenerator)
69 fun givenParsedMessageParameters(vararg params: MessageParameters) {
70 whenever(messageParametersParser.parse(any())).thenReturn(params.toList())
73 describe("validate") {
75 it("should return error when JSON is invalid") {
77 val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
81 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
82 else -> fail("validation should fail")
86 it("should return error when message param list is empty") {
88 givenParsedMessageParameters()
91 val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
94 assertThat(result.isLeft()).isTrue()
97 describe("when validating headers only") {
98 it("should return true when messages are the same") {
100 val jsonAsStream = sampleJsonAsStream()
101 val event = vesEvent()
102 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
103 val receivedMessageBytes = event.toByteArray()
105 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
106 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
109 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
112 assertThat(result).isTrue()
115 it("should return true when messages differ with payload only") {
117 val jsonAsStream = sampleJsonAsStream()
118 val generatedEvent = vesEvent(payload = "payload A")
119 val receivedEvent = vesEvent(payload = "payload B")
120 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
121 val receivedMessageBytes = receivedEvent.toByteArray()
123 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
124 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
127 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
130 assertThat(result).isTrue()
133 it("should return false when messages are different") {
135 val jsonAsStream = sampleJsonAsStream()
136 val generatedEvent = vesEvent()
137 val receivedEvent = vesEvent(eventId = "bbb")
138 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
139 val receivedMessageBytes = receivedEvent.toByteArray()
141 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
142 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
145 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
148 assertThat(result).isFalse()
152 describe("when validating whole messages") {
153 it("should return true when messages are the same") {
155 val jsonAsStream = sampleJsonAsStream()
156 val event = vesEvent()
157 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
158 val receivedMessageBytes = event.toByteArray()
160 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
161 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
164 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
167 assertThat(result).isTrue()
170 it("should return false when messages differ with payload only") {
172 val jsonAsStream = sampleJsonAsStream()
173 val generatedEvent = vesEvent(payload = "payload A")
174 val receivedEvent = vesEvent(payload = "payload B")
175 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
176 val receivedMessageBytes = receivedEvent.toByteArray()
178 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
179 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
182 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
185 assertThat(result).isFalse()
188 it("should return false when messages are different") {
190 val jsonAsStream = sampleJsonAsStream()
191 val generatedEvent = vesEvent()
192 val receivedEvent = vesEvent("bbb")
193 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
194 val receivedMessageBytes = receivedEvent.toByteArray()
196 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
197 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
200 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
203 assertThat(result).isFalse()
211 private const val DUMMY_EVENT_ID = "aaa"
212 private const val DUMMY_PAYLOAD = "payload"
214 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
215 return VesEvent.newBuilder()
216 .setCommonEventHeader(CommonEventHeader.newBuilder()
217 .setEventId(eventId))
218 .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
222 private const val sampleJsonArray = """["headersOnly"]"""
224 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()