Bump checkstyle version
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-dcae-app-simulator / src / test / kotlin / org / onap / dcae / collectors / veshv / simulators / dcaeapp / impl / DcaeAppSimulatorTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
21
22 import arrow.core.Left
23 import arrow.core.None
24 import arrow.core.Some
25 import arrow.effects.IO
26 import com.google.protobuf.ByteString
27 import com.nhaarman.mockitokotlin2.any
28 import com.nhaarman.mockitokotlin2.eq
29 import com.nhaarman.mockitokotlin2.mock
30 import com.nhaarman.mockitokotlin2.never
31 import com.nhaarman.mockitokotlin2.verify
32 import com.nhaarman.mockitokotlin2.whenever
33 import org.assertj.core.api.Assertions.assertThat
34 import org.jetbrains.spek.api.Spek
35 import org.jetbrains.spek.api.dsl.describe
36 import org.jetbrains.spek.api.dsl.it
37 import org.mockito.ArgumentMatchers.anySet
38 import org.onap.ves.VesEventOuterClass.CommonEventHeader
39 import org.onap.ves.VesEventOuterClass.VesEvent
40 import java.util.concurrent.ConcurrentLinkedQueue
41
42 /**
43  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
44  * @since August 2018
45  */
46 internal class DcaeAppSimulatorTest : Spek({
47     lateinit var consumerFactory: ConsumerFactory
48     lateinit var messageStreamValidation: MessageStreamValidation
49     lateinit var consumer: Consumer
50     lateinit var cut: DcaeAppSimulator
51
52     beforeEachTest {
53         consumerFactory = mock()
54         messageStreamValidation = mock()
55         consumer = mock()
56         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
57
58         whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
59     }
60
61     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
62
63     describe("listenToTopics") {
64         val topics = setOf("perf3gpp", "faults")
65
66         it("should fail when topic list is empty") {
67             val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
68             assertThat(result.isLeft()).isTrue()
69         }
70
71         it("should fail when topic list contains empty strings") {
72             val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
73             assertThat(result.isLeft()).isTrue()
74         }
75
76         it("should subscribe to given topics") {
77             cut.listenToTopics(topics).unsafeRunSync()
78             verify(consumerFactory).createConsumerForTopics(topics)
79         }
80
81         it("should subscribe to given topics when called with comma separated list") {
82             cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
83             verify(consumerFactory).createConsumerForTopics(topics)
84         }
85
86         it("should handle errors") {
87             // given
88             val error = RuntimeException("WTF")
89             whenever(consumerFactory.createConsumerForTopics(anySet()))
90                     .thenReturn(IO.raiseError(error))
91
92             // when
93             val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
94
95             // then
96             assertThat(result).isEqualTo(Left(error))
97         }
98     }
99
100     describe("state") {
101
102         it("should return None when topics hasn't been initialized") {
103             assertThat(cut.state()).isEqualTo(None)
104         }
105
106         describe("when topics are initialized") {
107             beforeEachTest {
108                 cut.listenToTopics("perf3gpp").unsafeRunSync()
109             }
110
111             it("should return some state when it has been set") {
112                 val state = consumerState()
113                 whenever(consumer.currentState()).thenReturn(state)
114
115                 assertThat(cut.state()).isEqualTo(Some(state))
116             }
117         }
118     }
119
120     describe("resetState") {
121         it("should do nothing when topics hasn't been initialized") {
122             cut.resetState().unsafeRunSync()
123             verify(consumer, never()).reset()
124         }
125
126         describe("when topics are initialized") {
127             beforeEachTest {
128                 cut.listenToTopics("perf3gpp").unsafeRunSync()
129             }
130
131             it("should reset the state") {
132                 // given
133                 whenever(consumer.reset()).thenReturn(IO.unit)
134
135                 // when
136                 cut.resetState().unsafeRunSync()
137
138                 // then
139                 verify(consumer).reset()
140             }
141         }
142     }
143
144     describe("validate") {
145         beforeEachTest {
146             whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
147         }
148
149         it("should use empty list when consumer is unavailable") {
150             // when
151             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
152
153             // then
154             verify(messageStreamValidation).validate(any(), eq(emptyList()))
155             assertThat(result).isTrue()
156         }
157
158         it("should delegate to MessageStreamValidation") {
159             // given
160             cut.listenToTopics("perf3gpp").unsafeRunSync()
161             whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
162
163             // when
164             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
165
166             // then
167             verify(messageStreamValidation).validate(any(), any())
168             assertThat(result).isTrue()
169         }
170     }
171 })
172
173
174 private const val DUMMY_EVENT_ID = "aaa"
175 private const val DUMMY_PAYLOAD = "payload"
176
177 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
178     return VesEvent.newBuilder()
179             .setCommonEventHeader(CommonEventHeader.newBuilder()
180                     .setEventId(eventId))
181             .setEventFields(ByteString.copyFrom(payload.toByteArray()))
182             .build()
183 }