34ec8f5a657c7787a51c5715102b3a3747708849
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
21
22 import arrow.core.Left
23 import arrow.core.None
24 import arrow.core.Some
25 import arrow.effects.IO
26 import com.google.protobuf.ByteString
27 import com.nhaarman.mockito_kotlin.any
28 import com.nhaarman.mockito_kotlin.eq
29 import com.nhaarman.mockito_kotlin.mock
30 import com.nhaarman.mockito_kotlin.never
31 import com.nhaarman.mockito_kotlin.verify
32 import com.nhaarman.mockito_kotlin.whenever
33 import org.assertj.core.api.Assertions.assertThat
34 import org.jetbrains.spek.api.Spek
35 import org.jetbrains.spek.api.dsl.describe
36 import org.jetbrains.spek.api.dsl.it
37 import org.mockito.ArgumentMatchers.anySet
38 import org.mockito.Mockito
39 import org.onap.ves.VesEventOuterClass.VesEvent
40 import org.onap.ves.VesEventOuterClass.CommonEventHeader
41 import java.util.concurrent.ConcurrentLinkedQueue
42
43 /**
44  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
45  * @since August 2018
46  */
47 internal class DcaeAppSimulatorTest : Spek({
48     lateinit var consumerFactory: ConsumerFactory
49     lateinit var messageStreamValidation: MessageStreamValidation
50     lateinit var consumer: Consumer
51     lateinit var cut: DcaeAppSimulator
52
53     beforeEachTest {
54         consumerFactory = mock()
55         messageStreamValidation = mock()
56         consumer = mock()
57         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
58
59         whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
60     }
61
62     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
63
64     describe("listenToTopics") {
65         val topics = setOf("hvMeas", "faults")
66
67         it("should fail when topic list is empty") {
68             val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
69             assertThat(result.isLeft()).isTrue()
70         }
71
72         it("should fail when topic list contains empty strings") {
73             val result = cut.listenToTopics(setOf("hvMeas", " ", "faults")).attempt().unsafeRunSync()
74             assertThat(result.isLeft()).isTrue()
75         }
76
77         it("should subscribe to given topics") {
78             cut.listenToTopics(topics).unsafeRunSync()
79             verify(consumerFactory).createConsumerForTopics(topics)
80         }
81
82         it("should subscribe to given topics when called with comma separated list") {
83             cut.listenToTopics("hvMeas,faults").unsafeRunSync()
84             verify(consumerFactory).createConsumerForTopics(topics)
85         }
86
87         it("should handle errors") {
88             // given
89             val error = RuntimeException("WTF")
90             whenever(consumerFactory.createConsumerForTopics(anySet()))
91                     .thenReturn(IO.raiseError(error))
92
93             // when
94             val result = cut.listenToTopics("hvMeas").attempt().unsafeRunSync()
95
96             // then
97             assertThat(result).isEqualTo(Left(error))
98         }
99     }
100
101     describe("state") {
102
103         it("should return None when topics hasn't been initialized") {
104             assertThat(cut.state()).isEqualTo(None)
105         }
106
107         describe("when topics are initialized") {
108             beforeEachTest {
109                 cut.listenToTopics("hvMeas").unsafeRunSync()
110             }
111
112             it("should return some state when it has been set") {
113                 val state = consumerState()
114                 whenever(consumer.currentState()).thenReturn(state)
115
116                 assertThat(cut.state()).isEqualTo(Some(state))
117             }
118         }
119     }
120
121     describe("resetState") {
122         it("should do nothing when topics hasn't been initialized") {
123             cut.resetState().unsafeRunSync()
124             verify(consumer, never()).reset()
125         }
126
127         describe("when topics are initialized") {
128             beforeEachTest {
129                 cut.listenToTopics("hvMeas").unsafeRunSync()
130             }
131
132             it("should reset the state") {
133                 // given
134                 whenever(consumer.reset()).thenReturn(IO.unit)
135
136                 // when
137                 cut.resetState().unsafeRunSync()
138
139                 // then
140                 verify(consumer).reset()
141             }
142         }
143     }
144
145     describe("validate") {
146         beforeEachTest {
147             whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
148         }
149
150         it("should use empty list when consumer is unavailable") {
151             // when
152             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
153
154             // then
155             verify(messageStreamValidation).validate(any(), eq(emptyList()))
156             assertThat(result).isTrue()
157         }
158
159         it("should delegate to MessageStreamValidation") {
160             // given
161             cut.listenToTopics("hvMeas").unsafeRunSync()
162             whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
163
164             // when
165             val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
166
167             // then
168             verify(messageStreamValidation).validate(any(), any())
169             assertThat(result).isTrue()
170         }
171     }
172 })
173
174
175 private const val DUMMY_EVENT_ID = "aaa"
176 private const val DUMMY_PAYLOAD = "payload"
177
178 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
179     return VesEvent.newBuilder()
180             .setCommonEventHeader(CommonEventHeader.newBuilder()
181                     .setEventId(eventId))
182             .setEventFields(ByteString.copyFrom(payload.toByteArray()))
183             .build()
184 }