2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
  22 import arrow.core.Either
 
  23 import arrow.core.Right
 
  24 import com.google.protobuf.ByteString
 
  25 import com.nhaarman.mockitokotlin2.any
 
  26 import com.nhaarman.mockitokotlin2.mock
 
  27 import com.nhaarman.mockitokotlin2.whenever
 
  28 import org.assertj.core.api.Assertions.assertThat
 
  29 import org.assertj.core.api.Assertions.fail
 
  30 import org.jetbrains.spek.api.Spek
 
  31 import org.jetbrains.spek.api.dsl.describe
 
  32 import org.jetbrains.spek.api.dsl.it
 
  33 import org.mockito.ArgumentMatchers.anyList
 
  34 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 
  35 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 
  36 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 
  37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 
  38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
 
  39 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
  40 import org.onap.ves.VesEventOuterClass.VesEvent
 
  41 import reactor.core.publisher.Flux
 
  42 import javax.json.stream.JsonParsingException
 
  45  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  48 internal class MessageStreamValidationTest : Spek({
 
  49     lateinit var messageParametersParser: MessageParametersParser
 
  50     lateinit var messageGenerator: MessageGenerator
 
  51     lateinit var cut: MessageStreamValidation
 
  54         messageParametersParser = mock()
 
  55         messageGenerator = mock()
 
  56         cut = MessageStreamValidation(messageGenerator, messageParametersParser)
 
  59     fun givenParsedMessageParameters(vararg params: MessageParameters) {
 
  60         whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
 
  63     describe("validate") {
 
  65         it("should return error when JSON is invalid") {
 
  67             val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
 
  71                 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
 
  72                 else -> fail("validation should fail")
 
  76         it("should return error when message param list is empty") {
 
  78             givenParsedMessageParameters()
 
  81             val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
 
  84             assertThat(result.isLeft()).isTrue()
 
  87         describe("when validating headers only") {
 
  88             it("should return true when messages are the same") {
 
  90                 val jsonAsStream = sampleJsonAsStream()
 
  91                 val event = vesEvent()
 
  92                 val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
 
  93                 val receivedMessageBytes = event.toByteArray()
 
  95                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
 
  96                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
  99                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 102                 assertThat(result).isTrue()
 
 105             it("should return true when messages differ with payload only") {
 
 107                 val jsonAsStream = sampleJsonAsStream()
 
 108                 val generatedEvent = vesEvent(payload = "payload A")
 
 109                 val receivedEvent = vesEvent(payload = "payload B")
 
 110                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 111                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 113                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
 
 114                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 117                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 120                 assertThat(result).isTrue()
 
 123             it("should return false when messages are different") {
 
 125                 val jsonAsStream = sampleJsonAsStream()
 
 126                 val generatedEvent = vesEvent()
 
 127                 val receivedEvent = vesEvent(eventId = "bbb")
 
 128                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 129                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 131                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
 
 132                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 135                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 138                 assertThat(result).isFalse()
 
 142         describe("when validating whole messages") {
 
 143             it("should return true when messages are the same") {
 
 145                 val jsonAsStream = sampleJsonAsStream()
 
 146                 val event = vesEvent()
 
 147                 val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
 
 148                 val receivedMessageBytes = event.toByteArray()
 
 150                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
 
 151                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 154                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 157                 assertThat(result).isTrue()
 
 160             it("should return false when messages differ with payload only") {
 
 162                 val jsonAsStream = sampleJsonAsStream()
 
 163                 val generatedEvent = vesEvent(payload = "payload A")
 
 164                 val receivedEvent = vesEvent(payload = "payload B")
 
 165                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 166                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 168                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
 
 169                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 172                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 175                 assertThat(result).isFalse()
 
 178             it("should return false when messages are different") {
 
 180                 val jsonAsStream = sampleJsonAsStream()
 
 181                 val generatedEvent = vesEvent()
 
 182                 val receivedEvent = vesEvent("bbb")
 
 183                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 184                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 186                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
 
 187                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 190                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 193                 assertThat(result).isFalse()
 
 201 private const val DUMMY_EVENT_ID = "aaa"
 
 202 private const val DUMMY_PAYLOAD = "payload"
 
 204 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
 
 205     return VesEvent.newBuilder()
 
 206             .setCommonEventHeader(CommonEventHeader.newBuilder()
 
 207                     .setEventId(eventId))
 
 208             .setEventFields(ByteString.copyFrom(payload.toByteArray()))
 
 212 private const val sampleJsonArray = """["headersOnly"]"""
 
 214 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()