2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Left
23 import arrow.core.None
24 import arrow.core.Some
25 import arrow.effects.IO
26 import com.google.protobuf.ByteString
27 import com.nhaarman.mockito_kotlin.any
28 import com.nhaarman.mockito_kotlin.eq
29 import com.nhaarman.mockito_kotlin.mock
30 import com.nhaarman.mockito_kotlin.never
31 import com.nhaarman.mockito_kotlin.verify
32 import com.nhaarman.mockito_kotlin.whenever
33 import org.assertj.core.api.Assertions.assertThat
34 import org.jetbrains.spek.api.Spek
35 import org.jetbrains.spek.api.dsl.describe
36 import org.jetbrains.spek.api.dsl.it
37 import org.mockito.ArgumentMatchers.anySet
38 import org.mockito.Mockito
39 import org.onap.ves.VesEventOuterClass.VesEvent
40 import org.onap.ves.VesEventOuterClass.CommonEventHeader
41 import java.util.concurrent.ConcurrentLinkedQueue
44 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
47 internal class DcaeAppSimulatorTest : Spek({
48 lateinit var consumerFactory: ConsumerFactory
49 lateinit var messageStreamValidation: MessageStreamValidation
50 lateinit var consumer: Consumer
51 lateinit var cut: DcaeAppSimulator
54 consumerFactory = mock()
55 messageStreamValidation = mock()
57 cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
59 whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
62 fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
64 describe("listenToTopics") {
65 val topics = setOf("perf3gpp", "faults")
67 it("should fail when topic list is empty") {
68 val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
69 assertThat(result.isLeft()).isTrue()
72 it("should fail when topic list contains empty strings") {
73 val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
74 assertThat(result.isLeft()).isTrue()
77 it("should subscribe to given topics") {
78 cut.listenToTopics(topics).unsafeRunSync()
79 verify(consumerFactory).createConsumerForTopics(topics)
82 it("should subscribe to given topics when called with comma separated list") {
83 cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
84 verify(consumerFactory).createConsumerForTopics(topics)
87 it("should handle errors") {
89 val error = RuntimeException("WTF")
90 whenever(consumerFactory.createConsumerForTopics(anySet()))
91 .thenReturn(IO.raiseError(error))
94 val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
97 assertThat(result).isEqualTo(Left(error))
103 it("should return None when topics hasn't been initialized") {
104 assertThat(cut.state()).isEqualTo(None)
107 describe("when topics are initialized") {
109 cut.listenToTopics("perf3gpp").unsafeRunSync()
112 it("should return some state when it has been set") {
113 val state = consumerState()
114 whenever(consumer.currentState()).thenReturn(state)
116 assertThat(cut.state()).isEqualTo(Some(state))
121 describe("resetState") {
122 it("should do nothing when topics hasn't been initialized") {
123 cut.resetState().unsafeRunSync()
124 verify(consumer, never()).reset()
127 describe("when topics are initialized") {
129 cut.listenToTopics("perf3gpp").unsafeRunSync()
132 it("should reset the state") {
134 whenever(consumer.reset()).thenReturn(IO.unit)
137 cut.resetState().unsafeRunSync()
140 verify(consumer).reset()
145 describe("validate") {
147 whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
150 it("should use empty list when consumer is unavailable") {
152 val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
155 verify(messageStreamValidation).validate(any(), eq(emptyList()))
156 assertThat(result).isTrue()
159 it("should delegate to MessageStreamValidation") {
161 cut.listenToTopics("perf3gpp").unsafeRunSync()
162 whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
165 val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
168 verify(messageStreamValidation).validate(any(), any())
169 assertThat(result).isTrue()
175 private const val DUMMY_EVENT_ID = "aaa"
176 private const val DUMMY_PAYLOAD = "payload"
178 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
179 return VesEvent.newBuilder()
180 .setCommonEventHeader(CommonEventHeader.newBuilder()
181 .setEventId(eventId))
182 .setEventFields(ByteString.copyFrom(payload.toByteArray()))