2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
  22 import arrow.core.Left
 
  23 import arrow.core.None
 
  24 import arrow.core.Some
 
  25 import arrow.effects.IO
 
  26 import com.google.protobuf.ByteString
 
  27 import com.nhaarman.mockito_kotlin.any
 
  28 import com.nhaarman.mockito_kotlin.eq
 
  29 import com.nhaarman.mockito_kotlin.mock
 
  30 import com.nhaarman.mockito_kotlin.never
 
  31 import com.nhaarman.mockito_kotlin.verify
 
  32 import com.nhaarman.mockito_kotlin.whenever
 
  33 import org.assertj.core.api.Assertions.assertThat
 
  34 import org.jetbrains.spek.api.Spek
 
  35 import org.jetbrains.spek.api.dsl.describe
 
  36 import org.jetbrains.spek.api.dsl.it
 
  37 import org.mockito.ArgumentMatchers.anySet
 
  38 import org.mockito.Mockito
 
  39 import org.onap.ves.VesEventOuterClass.VesEvent
 
  40 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
  41 import java.util.concurrent.ConcurrentLinkedQueue
 
  44  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  47 internal class DcaeAppSimulatorTest : Spek({
 
  48     lateinit var consumerFactory: ConsumerFactory
 
  49     lateinit var messageStreamValidation: MessageStreamValidation
 
  50     lateinit var consumer: Consumer
 
  51     lateinit var cut: DcaeAppSimulator
 
  54         consumerFactory = mock()
 
  55         messageStreamValidation = mock()
 
  57         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
 
  59         whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
 
  62     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
 
  64     describe("listenToTopics") {
 
  65         val topics = setOf("perf3gpp", "faults")
 
  67         it("should fail when topic list is empty") {
 
  68             val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
 
  69             assertThat(result.isLeft()).isTrue()
 
  72         it("should fail when topic list contains empty strings") {
 
  73             val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
 
  74             assertThat(result.isLeft()).isTrue()
 
  77         it("should subscribe to given topics") {
 
  78             cut.listenToTopics(topics).unsafeRunSync()
 
  79             verify(consumerFactory).createConsumerForTopics(topics)
 
  82         it("should subscribe to given topics when called with comma separated list") {
 
  83             cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
 
  84             verify(consumerFactory).createConsumerForTopics(topics)
 
  87         it("should handle errors") {
 
  89             val error = RuntimeException("WTF")
 
  90             whenever(consumerFactory.createConsumerForTopics(anySet()))
 
  91                     .thenReturn(IO.raiseError(error))
 
  94             val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
 
  97             assertThat(result).isEqualTo(Left(error))
 
 103         it("should return None when topics hasn't been initialized") {
 
 104             assertThat(cut.state()).isEqualTo(None)
 
 107         describe("when topics are initialized") {
 
 109                 cut.listenToTopics("perf3gpp").unsafeRunSync()
 
 112             it("should return some state when it has been set") {
 
 113                 val state = consumerState()
 
 114                 whenever(consumer.currentState()).thenReturn(state)
 
 116                 assertThat(cut.state()).isEqualTo(Some(state))
 
 121     describe("resetState") {
 
 122         it("should do nothing when topics hasn't been initialized") {
 
 123             cut.resetState().unsafeRunSync()
 
 124             verify(consumer, never()).reset()
 
 127         describe("when topics are initialized") {
 
 129                 cut.listenToTopics("perf3gpp").unsafeRunSync()
 
 132             it("should reset the state") {
 
 134                 whenever(consumer.reset()).thenReturn(IO.unit)
 
 137                 cut.resetState().unsafeRunSync()
 
 140                 verify(consumer).reset()
 
 145     describe("validate") {
 
 147             whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
 
 150         it("should use empty list when consumer is unavailable") {
 
 152             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
 
 155             verify(messageStreamValidation).validate(any(), eq(emptyList()))
 
 156             assertThat(result).isTrue()
 
 159         it("should delegate to MessageStreamValidation") {
 
 161             cut.listenToTopics("perf3gpp").unsafeRunSync()
 
 162             whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
 
 165             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
 
 168             verify(messageStreamValidation).validate(any(), any())
 
 169             assertThat(result).isTrue()
 
 175 private const val DUMMY_EVENT_ID = "aaa"
 
 176 private const val DUMMY_PAYLOAD = "payload"
 
 178 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
 
 179     return VesEvent.newBuilder()
 
 180             .setCommonEventHeader(CommonEventHeader.newBuilder()
 
 181                     .setEventId(eventId))
 
 182             .setEventFields(ByteString.copyFrom(payload.toByteArray()))