2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Either
23 import arrow.core.Left
24 import arrow.core.None
25 import arrow.core.Right
26 import arrow.core.Some
27 import arrow.effects.IO
28 import javax.json.stream.JsonParsingException
29 import com.google.protobuf.ByteString
30 import com.nhaarman.mockito_kotlin.any
31 import com.nhaarman.mockito_kotlin.mock
32 import com.nhaarman.mockito_kotlin.never
33 import com.nhaarman.mockito_kotlin.verify
34 import com.nhaarman.mockito_kotlin.whenever
35 import org.assertj.core.api.Assertions.assertThat
36 import org.assertj.core.api.Assertions.fail
37 import org.jetbrains.spek.api.Spek
38 import org.jetbrains.spek.api.dsl.describe
39 import org.jetbrains.spek.api.dsl.it
40 import org.mockito.ArgumentMatchers.anyList
41 import org.mockito.ArgumentMatchers.anySet
42 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
43 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
44 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
45 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
46 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
47 import org.onap.ves.VesEventV5.VesEvent
48 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
49 import reactor.core.publisher.Flux
50 import java.util.concurrent.ConcurrentLinkedQueue
51 import javax.json.Json
52 import javax.json.JsonArray
53 import javax.json.JsonValue
56 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
59 internal class MessageStreamValidationTest : Spek({
60 lateinit var messageParametersParser: MessageParametersParser
61 lateinit var messageGenerator: MessageGenerator
62 lateinit var cut: MessageStreamValidation
65 messageParametersParser = mock()
66 messageGenerator = mock()
67 cut = MessageStreamValidation(messageParametersParser, messageGenerator)
70 fun givenParsedMessageParameters(vararg params: MessageParameters) {
71 whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
74 describe("validate") {
76 it("should return error when JSON is invalid") {
78 val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
82 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
83 else -> fail("validation should fail")
87 it("should return error when message param list is empty") {
89 givenParsedMessageParameters()
92 val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
95 assertThat(result.isLeft()).isTrue()
98 describe("when validating headers only") {
99 it("should return true when messages are the same") {
101 val jsonAsStream = sampleJsonAsStream()
102 val event = vesEvent()
103 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
104 val receivedMessageBytes = event.toByteArray()
106 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
107 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
110 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
113 assertThat(result).isTrue()
116 it("should return true when messages differ with payload only") {
118 val jsonAsStream = sampleJsonAsStream()
119 val generatedEvent = vesEvent(payload = "payload A")
120 val receivedEvent = vesEvent(payload = "payload B")
121 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
122 val receivedMessageBytes = receivedEvent.toByteArray()
124 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
125 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
128 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
131 assertThat(result).isTrue()
134 it("should return false when messages are different") {
136 val jsonAsStream = sampleJsonAsStream()
137 val generatedEvent = vesEvent()
138 val receivedEvent = vesEvent(eventId = "bbb")
139 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
140 val receivedMessageBytes = receivedEvent.toByteArray()
142 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
143 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
146 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
149 assertThat(result).isFalse()
153 describe("when validating whole messages") {
154 it("should return true when messages are the same") {
156 val jsonAsStream = sampleJsonAsStream()
157 val event = vesEvent()
158 val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
159 val receivedMessageBytes = event.toByteArray()
161 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
162 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
165 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
168 assertThat(result).isTrue()
171 it("should return false when messages differ with payload only") {
173 val jsonAsStream = sampleJsonAsStream()
174 val generatedEvent = vesEvent(payload = "payload A")
175 val receivedEvent = vesEvent(payload = "payload B")
176 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
177 val receivedMessageBytes = receivedEvent.toByteArray()
179 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
180 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
183 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
186 assertThat(result).isFalse()
189 it("should return false when messages are different") {
191 val jsonAsStream = sampleJsonAsStream()
192 val generatedEvent = vesEvent()
193 val receivedEvent = vesEvent("bbb")
194 val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
195 val receivedMessageBytes = receivedEvent.toByteArray()
197 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
198 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
201 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
204 assertThat(result).isFalse()
212 private const val DUMMY_EVENT_ID = "aaa"
213 private const val DUMMY_PAYLOAD = "payload"
215 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
216 return VesEvent.newBuilder()
217 .setCommonEventHeader(CommonEventHeader.newBuilder()
218 .setEventId(eventId))
219 .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
223 private const val sampleJsonArray = """["headersOnly"]"""
225 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()