2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.None
23 import arrow.core.Some
24 import com.google.protobuf.ByteString
25 import com.nhaarman.mockitokotlin2.any
26 import com.nhaarman.mockitokotlin2.eq
27 import com.nhaarman.mockitokotlin2.mock
28 import com.nhaarman.mockitokotlin2.never
29 import com.nhaarman.mockitokotlin2.verify
30 import com.nhaarman.mockitokotlin2.whenever
31 import org.assertj.core.api.Assertions.*
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.mockito.ArgumentMatchers.anySet
36 import org.onap.ves.VesEventOuterClass.CommonEventHeader
37 import org.onap.ves.VesEventOuterClass.VesEvent
38 import reactor.core.publisher.Mono
39 import reactor.test.StepVerifier
40 import java.lang.IllegalArgumentException
41 import java.util.concurrent.ConcurrentLinkedQueue
42 import kotlin.test.assertFailsWith
45 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
48 internal class DcaeAppSimulatorTest : Spek({
49 lateinit var consumerFactory: ConsumerFactory
50 lateinit var messageStreamValidation: MessageStreamValidation
51 lateinit var consumer: Consumer
52 lateinit var cut: DcaeAppSimulator
55 consumerFactory = mock()
56 messageStreamValidation = mock()
58 cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
60 whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer)
63 fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
65 describe("listenToTopics") {
66 val topics = setOf("perf3gpp", "faults")
68 it("should fail when topic list is empty") {
69 assertFailsWith(IllegalArgumentException::class){
70 cut.listenToTopics(setOf())
74 it("should fail when topic list contains empty strings") {
75 assertFailsWith(IllegalArgumentException::class){
76 cut.listenToTopics(setOf("perf3gpp", " ", "faults"))
80 it("should subscribe to given topics") {
81 cut.listenToTopics(topics)
82 verify(consumerFactory).createConsumerForTopics(topics)
85 it("should subscribe to given topics when called with comma separated list") {
86 cut.listenToTopics("perf3gpp,faults")
87 verify(consumerFactory).createConsumerForTopics(topics)
92 it("should return None when topics hasn't been initialized") {
93 assertThat(cut.state()).isEqualTo(None)
96 describe("when topics are initialized") {
98 cut.listenToTopics("perf3gpp")
101 it("should return some state when it has been set") {
102 val state = consumerState()
103 whenever(consumer.currentState()).thenReturn(state)
105 assertThat(cut.state()).isEqualTo(Some(state))
110 describe("resetState") {
111 it("should do nothing when topics hasn't been initialized") {
113 verify(consumer, never()).reset()
116 describe("when topics are initialized") {
118 cut.listenToTopics("perf3gpp")
121 it("should reset the state") {
126 verify(consumer).reset()
131 describe("validate") {
133 whenever(messageStreamValidation.validate(any(), any())).thenReturn(Mono.just(true))
136 it("should use empty list when consumer is unavailable") {
138 .create(cut.validate("['The JSON']".byteInputStream()))
142 verify(messageStreamValidation).validate(any(), eq(emptyList()))
145 it("should delegate to MessageStreamValidation") {
147 cut.listenToTopics("perf3gpp")
148 whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
151 .create(cut.validate("['The JSON']".byteInputStream()))
156 verify(messageStreamValidation).validate(any(), any())
162 private const val DUMMY_EVENT_ID = "aaa"
163 private const val DUMMY_PAYLOAD = "payload"
165 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
166 return VesEvent.newBuilder()
167 .setCommonEventHeader(CommonEventHeader.newBuilder()
168 .setEventId(eventId))
169 .setEventFields(ByteString.copyFrom(payload.toByteArray()))