2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
22 import arrow.core.Left
23 import arrow.core.None
24 import arrow.core.Some
25 import arrow.effects.IO
26 import com.google.protobuf.ByteString
27 import com.nhaarman.mockitokotlin2.any
28 import com.nhaarman.mockitokotlin2.eq
29 import com.nhaarman.mockitokotlin2.mock
30 import com.nhaarman.mockitokotlin2.never
31 import com.nhaarman.mockitokotlin2.verify
32 import com.nhaarman.mockitokotlin2.whenever
33 import org.assertj.core.api.Assertions.assertThat
34 import org.jetbrains.spek.api.Spek
35 import org.jetbrains.spek.api.dsl.describe
36 import org.jetbrains.spek.api.dsl.it
37 import org.mockito.ArgumentMatchers.anySet
38 import org.onap.ves.VesEventOuterClass.CommonEventHeader
39 import org.onap.ves.VesEventOuterClass.VesEvent
40 import java.util.concurrent.ConcurrentLinkedQueue
43 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
46 internal class DcaeAppSimulatorTest : Spek({
47 lateinit var consumerFactory: ConsumerFactory
48 lateinit var messageStreamValidation: MessageStreamValidation
49 lateinit var consumer: Consumer
50 lateinit var cut: DcaeAppSimulator
53 consumerFactory = mock()
54 messageStreamValidation = mock()
56 cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
58 whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
61 fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
63 describe("listenToTopics") {
64 val topics = setOf("perf3gpp", "faults")
66 it("should fail when topic list is empty") {
67 val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
68 assertThat(result.isLeft()).isTrue()
71 it("should fail when topic list contains empty strings") {
72 val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
73 assertThat(result.isLeft()).isTrue()
76 it("should subscribe to given topics") {
77 cut.listenToTopics(topics).unsafeRunSync()
78 verify(consumerFactory).createConsumerForTopics(topics)
81 it("should subscribe to given topics when called with comma separated list") {
82 cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
83 verify(consumerFactory).createConsumerForTopics(topics)
86 it("should handle errors") {
88 val error = RuntimeException("WTF")
89 whenever(consumerFactory.createConsumerForTopics(anySet()))
90 .thenReturn(IO.raiseError(error))
93 val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
96 assertThat(result).isEqualTo(Left(error))
102 it("should return None when topics hasn't been initialized") {
103 assertThat(cut.state()).isEqualTo(None)
106 describe("when topics are initialized") {
108 cut.listenToTopics("perf3gpp").unsafeRunSync()
111 it("should return some state when it has been set") {
112 val state = consumerState()
113 whenever(consumer.currentState()).thenReturn(state)
115 assertThat(cut.state()).isEqualTo(Some(state))
120 describe("resetState") {
121 it("should do nothing when topics hasn't been initialized") {
122 cut.resetState().unsafeRunSync()
123 verify(consumer, never()).reset()
126 describe("when topics are initialized") {
128 cut.listenToTopics("perf3gpp").unsafeRunSync()
131 it("should reset the state") {
133 whenever(consumer.reset()).thenReturn(IO.unit)
136 cut.resetState().unsafeRunSync()
139 verify(consumer).reset()
144 describe("validate") {
146 whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
149 it("should use empty list when consumer is unavailable") {
151 val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
154 verify(messageStreamValidation).validate(any(), eq(emptyList()))
155 assertThat(result).isTrue()
158 it("should delegate to MessageStreamValidation") {
160 cut.listenToTopics("perf3gpp").unsafeRunSync()
161 whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
164 val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
167 verify(messageStreamValidation).validate(any(), any())
168 assertThat(result).isTrue()
174 private const val DUMMY_EVENT_ID = "aaa"
175 private const val DUMMY_PAYLOAD = "payload"
177 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
178 return VesEvent.newBuilder()
179 .setCommonEventHeader(CommonEventHeader.newBuilder()
180 .setEventId(eventId))
181 .setEventFields(ByteString.copyFrom(payload.toByteArray()))