2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Right
23 import com.google.protobuf.ByteString
24 import com.nhaarman.mockitokotlin2.any
25 import com.nhaarman.mockitokotlin2.eq
26 import com.nhaarman.mockitokotlin2.mock
27 import com.nhaarman.mockitokotlin2.never
28 import com.nhaarman.mockitokotlin2.verify
29 import com.nhaarman.mockitokotlin2.whenever
30 import org.assertj.core.api.Assertions.*
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.mockito.ArgumentMatchers.anySet
35 import org.onap.ves.VesEventOuterClass.CommonEventHeader
36 import org.onap.ves.VesEventOuterClass.VesEvent
37 import reactor.core.publisher.Mono
38 import reactor.test.StepVerifier
39 import java.lang.IllegalArgumentException
40 import java.util.concurrent.ConcurrentLinkedQueue
41 import kotlin.test.assertFailsWith
45 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
48 internal class DcaeAppSimulatorTest : Spek({
49 lateinit var consumerFactory: ConsumerFactory
50 lateinit var messageStreamValidation: MessageStreamValidation
51 lateinit var perf3gpp_consumer: Consumer
52 lateinit var faults_consumer: Consumer
53 lateinit var cut: DcaeAppSimulator
56 consumerFactory = mock()
57 messageStreamValidation = mock()
58 perf3gpp_consumer = mock()
59 faults_consumer = mock()
60 cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
62 whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf(
63 PERF3GPP_TOPIC to perf3gpp_consumer,
64 FAULTS_TOPICS to faults_consumer))
67 fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
69 describe("listenToTopics") {
70 it("should fail when topic list is empty") {
71 assertFailsWith(IllegalArgumentException::class) {
72 cut.listenToTopics(setOf())
76 it("should fail when topic list contains empty strings") {
77 assertFailsWith(IllegalArgumentException::class) {
78 cut.listenToTopics(setOf(PERF3GPP_TOPIC, " ", FAULTS_TOPICS))
82 it("should subscribe to given topics") {
83 cut.listenToTopics(TWO_TOPICS)
84 verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
87 it("should subscribe to given topics when called with comma separated list") {
88 cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS")
89 verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
94 it("should return Left when topics hasn't been initialized") {
95 assertThat(cut.state(PERF3GPP_TOPIC).isLeft()).isTrue()
98 describe("when topics are initialized") {
100 cut.listenToTopics(TWO_TOPICS)
103 it("should return state when it has been set") {
104 val state = consumerState()
105 whenever(perf3gpp_consumer.currentState()).thenReturn(state)
106 whenever(faults_consumer.currentState()).thenReturn(state)
108 assertThat(cut.state(PERF3GPP_TOPIC)).isEqualTo(Right(state))
109 assertThat(cut.state(FAULTS_TOPICS)).isEqualTo(Right(state))
114 describe("resetState") {
115 it("should do nothing when topics hasn't been initialized") {
116 cut.resetState(PERF3GPP_TOPIC)
117 cut.resetState(FAULTS_TOPICS)
118 verify(perf3gpp_consumer, never()).reset()
119 verify(faults_consumer, never()).reset()
122 describe("when topics are initialized") {
124 cut.listenToTopics(TWO_TOPICS)
127 it("should reset the state of given topic consumer") {
128 cut.resetState(PERF3GPP_TOPIC)
130 verify(perf3gpp_consumer).reset()
131 verify(faults_consumer, never()).reset()
136 describe("validate") {
138 whenever(messageStreamValidation.validate(any(), any())).thenReturn(Mono.just(true))
141 it("should use empty list when consumer is unavailable") {
143 .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
147 verify(messageStreamValidation).validate(any(), eq(emptyList()))
150 it("should delegate to MessageStreamValidation") {
151 cut.listenToTopics(PERF3GPP_TOPIC)
152 whenever(perf3gpp_consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
155 .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
159 verify(messageStreamValidation).validate(any(), any())
165 private const val PERF3GPP_TOPIC = "perf3gpp"
166 private const val FAULTS_TOPICS = "faults"
167 private val TWO_TOPICS = setOf(PERF3GPP_TOPIC, FAULTS_TOPICS)
169 private const val DUMMY_EVENT_ID = "aaa"
170 private const val DUMMY_PAYLOAD = "payload"
172 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
173 return VesEvent.newBuilder()
174 .setCommonEventHeader(CommonEventHeader.newBuilder()
175 .setEventId(eventId))
176 .setEventFields(ByteString.copyFrom(payload.toByteArray()))