2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
  22 import arrow.core.Left
 
  23 import arrow.core.None
 
  24 import arrow.core.Some
 
  25 import arrow.effects.IO
 
  26 import com.google.protobuf.ByteString
 
  27 import com.nhaarman.mockitokotlin2.any
 
  28 import com.nhaarman.mockitokotlin2.eq
 
  29 import com.nhaarman.mockitokotlin2.mock
 
  30 import com.nhaarman.mockitokotlin2.never
 
  31 import com.nhaarman.mockitokotlin2.verify
 
  32 import com.nhaarman.mockitokotlin2.whenever
 
  33 import org.assertj.core.api.Assertions.assertThat
 
  34 import org.jetbrains.spek.api.Spek
 
  35 import org.jetbrains.spek.api.dsl.describe
 
  36 import org.jetbrains.spek.api.dsl.it
 
  37 import org.mockito.ArgumentMatchers.anySet
 
  38 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
  39 import org.onap.ves.VesEventOuterClass.VesEvent
 
  40 import java.util.concurrent.ConcurrentLinkedQueue
 
  43  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  46 internal class DcaeAppSimulatorTest : Spek({
 
  47     lateinit var consumerFactory: ConsumerFactory
 
  48     lateinit var messageStreamValidation: MessageStreamValidation
 
  49     lateinit var consumer: Consumer
 
  50     lateinit var cut: DcaeAppSimulator
 
  53         consumerFactory = mock()
 
  54         messageStreamValidation = mock()
 
  56         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
 
  58         whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
 
  61     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
 
  63     describe("listenToTopics") {
 
  64         val topics = setOf("perf3gpp", "faults")
 
  66         it("should fail when topic list is empty") {
 
  67             val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
 
  68             assertThat(result.isLeft()).isTrue()
 
  71         it("should fail when topic list contains empty strings") {
 
  72             val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
 
  73             assertThat(result.isLeft()).isTrue()
 
  76         it("should subscribe to given topics") {
 
  77             cut.listenToTopics(topics).unsafeRunSync()
 
  78             verify(consumerFactory).createConsumerForTopics(topics)
 
  81         it("should subscribe to given topics when called with comma separated list") {
 
  82             cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
 
  83             verify(consumerFactory).createConsumerForTopics(topics)
 
  86         it("should handle errors") {
 
  88             val error = RuntimeException("WTF")
 
  89             whenever(consumerFactory.createConsumerForTopics(anySet()))
 
  90                     .thenReturn(IO.raiseError(error))
 
  93             val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
 
  96             assertThat(result).isEqualTo(Left(error))
 
 102         it("should return None when topics hasn't been initialized") {
 
 103             assertThat(cut.state()).isEqualTo(None)
 
 106         describe("when topics are initialized") {
 
 108                 cut.listenToTopics("perf3gpp").unsafeRunSync()
 
 111             it("should return some state when it has been set") {
 
 112                 val state = consumerState()
 
 113                 whenever(consumer.currentState()).thenReturn(state)
 
 115                 assertThat(cut.state()).isEqualTo(Some(state))
 
 120     describe("resetState") {
 
 121         it("should do nothing when topics hasn't been initialized") {
 
 122             cut.resetState().unsafeRunSync()
 
 123             verify(consumer, never()).reset()
 
 126         describe("when topics are initialized") {
 
 128                 cut.listenToTopics("perf3gpp").unsafeRunSync()
 
 131             it("should reset the state") {
 
 133                 whenever(consumer.reset()).thenReturn(IO.unit)
 
 136                 cut.resetState().unsafeRunSync()
 
 139                 verify(consumer).reset()
 
 144     describe("validate") {
 
 146             whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
 
 149         it("should use empty list when consumer is unavailable") {
 
 151             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
 
 154             verify(messageStreamValidation).validate(any(), eq(emptyList()))
 
 155             assertThat(result).isTrue()
 
 158         it("should delegate to MessageStreamValidation") {
 
 160             cut.listenToTopics("perf3gpp").unsafeRunSync()
 
 161             whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
 
 164             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
 
 167             verify(messageStreamValidation).validate(any(), any())
 
 168             assertThat(result).isTrue()
 
 174 private const val DUMMY_EVENT_ID = "aaa"
 
 175 private const val DUMMY_PAYLOAD = "payload"
 
 177 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
 
 178     return VesEvent.newBuilder()
 
 179             .setCommonEventHeader(CommonEventHeader.newBuilder()
 
 180                     .setEventId(eventId))
 
 181             .setEventFields(ByteString.copyFrom(payload.toByteArray()))