2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
  22 import arrow.core.Either
 
  23 import arrow.core.Right
 
  24 import com.google.protobuf.ByteString
 
  25 import com.nhaarman.mockito_kotlin.any
 
  26 import com.nhaarman.mockito_kotlin.mock
 
  27 import com.nhaarman.mockito_kotlin.whenever
 
  28 import org.assertj.core.api.Assertions.assertThat
 
  29 import org.assertj.core.api.Assertions.fail
 
  30 import org.jetbrains.spek.api.Spek
 
  31 import org.jetbrains.spek.api.dsl.describe
 
  32 import org.jetbrains.spek.api.dsl.it
 
  33 import org.mockito.ArgumentMatchers.anyList
 
  34 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 
  35 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 
  36 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 
  37 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 
  38 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
 
  39 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
  40 import org.onap.ves.VesEventOuterClass.VesEvent
 
  41 import reactor.core.publisher.Flux
 
  42 import javax.json.stream.JsonParsingException
 
  45  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  48 internal class MessageStreamValidationTest : Spek({
 
  49     lateinit var messageParametersParser: MessageParametersParser
 
  50     lateinit var messageGenerator: MessageGenerator
 
  51     lateinit var cut: MessageStreamValidation
 
  54         messageParametersParser = mock()
 
  55         messageGenerator = mock()
 
  56         cut = MessageStreamValidation(messageParametersParser, messageGenerator)
 
  59     fun givenParsedMessageParameters(vararg params: MessageParameters) {
 
  60         whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
 
  63     describe("validate") {
 
  65         it("should return error when JSON is invalid") {
 
  67             val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
 
  71                 is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
 
  72                 else -> fail("validation should fail")
 
  76         it("should return error when message param list is empty") {
 
  78             givenParsedMessageParameters()
 
  81             val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
 
  84             assertThat(result.isLeft()).isTrue()
 
  87         describe("when validating headers only") {
 
  88             it("should return true when messages are the same") {
 
  90                 val jsonAsStream = sampleJsonAsStream()
 
  91                 val event = vesEvent()
 
  92                 val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
 
  93                 val receivedMessageBytes = event.toByteArray()
 
  95                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
 
  96                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
  99                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 102                 assertThat(result).isTrue()
 
 105             it("should return true when messages differ with payload only") {
 
 107                 val jsonAsStream = sampleJsonAsStream()
 
 108                 val generatedEvent = vesEvent(payload = "payload A")
 
 109                 val receivedEvent = vesEvent(payload = "payload B")
 
 110                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 111                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 113                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
 
 114                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 117                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 120                 assertThat(result).isTrue()
 
 123             it("should return false when messages are different") {
 
 125                 val jsonAsStream = sampleJsonAsStream()
 
 126                 val generatedEvent = vesEvent()
 
 127                 val receivedEvent = vesEvent(eventId = "bbb")
 
 128                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 129                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 131                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
 
 132                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 135                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 138                 assertThat(result).isFalse()
 
 142         describe("when validating whole messages") {
 
 143             it("should return true when messages are the same") {
 
 145                 val jsonAsStream = sampleJsonAsStream()
 
 146                 val event = vesEvent()
 
 147                 val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
 
 148                 val receivedMessageBytes = event.toByteArray()
 
 150                 givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
 
 151                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 154                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 157                 assertThat(result).isTrue()
 
 160             it("should return false when messages differ with payload only") {
 
 162                 val jsonAsStream = sampleJsonAsStream()
 
 163                 val generatedEvent = vesEvent(payload = "payload A")
 
 164                 val receivedEvent = vesEvent(payload = "payload B")
 
 165                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 166                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 168                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
 
 169                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 172                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 175                 assertThat(result).isFalse()
 
 178             it("should return false when messages are different") {
 
 180                 val jsonAsStream = sampleJsonAsStream()
 
 181                 val generatedEvent = vesEvent()
 
 182                 val receivedEvent = vesEvent("bbb")
 
 183                 val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
 
 184                 val receivedMessageBytes = receivedEvent.toByteArray()
 
 186                 givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
 
 187                 whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
 
 190                 val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
 
 193                 assertThat(result).isFalse()
 
 201 private const val DUMMY_EVENT_ID = "aaa"
 
 202 private const val DUMMY_PAYLOAD = "payload"
 
 204 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
 
 205     return VesEvent.newBuilder()
 
 206             .setCommonEventHeader(CommonEventHeader.newBuilder()
 
 207                     .setEventId(eventId))
 
 208             .setEventFields(ByteString.copyFrom(payload.toByteArray()))
 
 212 private const val sampleJsonArray = """["headersOnly"]"""
 
 214 private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()