493100fc1d286db4e93a5e9d2454c0ba4c142a36
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
21
22 import arrow.core.None
23 import arrow.core.Some
24 import com.google.protobuf.ByteString
25 import com.nhaarman.mockitokotlin2.any
26 import com.nhaarman.mockitokotlin2.eq
27 import com.nhaarman.mockitokotlin2.mock
28 import com.nhaarman.mockitokotlin2.never
29 import com.nhaarman.mockitokotlin2.verify
30 import com.nhaarman.mockitokotlin2.whenever
31 import org.assertj.core.api.Assertions.*
32 import org.jetbrains.spek.api.Spek
33 import org.jetbrains.spek.api.dsl.describe
34 import org.jetbrains.spek.api.dsl.it
35 import org.mockito.ArgumentMatchers.anySet
36 import org.onap.ves.VesEventOuterClass.CommonEventHeader
37 import org.onap.ves.VesEventOuterClass.VesEvent
38 import reactor.core.publisher.Mono
39 import reactor.test.StepVerifier
40 import java.lang.IllegalArgumentException
41 import java.util.concurrent.ConcurrentLinkedQueue
42 import kotlin.test.assertFailsWith
43
44 /**
45  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
46  * @since August 2018
47  */
48 internal class DcaeAppSimulatorTest : Spek({
49     lateinit var consumerFactory: ConsumerFactory
50     lateinit var messageStreamValidation: MessageStreamValidation
51     lateinit var consumer: Consumer
52     lateinit var cut: DcaeAppSimulator
53
54     beforeEachTest {
55         consumerFactory = mock()
56         messageStreamValidation = mock()
57         consumer = mock()
58         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
59
60         whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer)
61     }
62
63     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
64
65     describe("listenToTopics") {
66         val topics = setOf("perf3gpp", "faults")
67
68         it("should fail when topic list is empty") {
69             assertFailsWith(IllegalArgumentException::class){
70                 cut.listenToTopics(setOf())
71             }
72         }
73
74         it("should fail when topic list contains empty strings") {
75             assertFailsWith(IllegalArgumentException::class){
76                 cut.listenToTopics(setOf("perf3gpp", " ", "faults"))
77             }
78         }
79
80         it("should subscribe to given topics") {
81             cut.listenToTopics(topics)
82             verify(consumerFactory).createConsumerForTopics(topics)
83         }
84
85         it("should subscribe to given topics when called with comma separated list") {
86             cut.listenToTopics("perf3gpp,faults")
87             verify(consumerFactory).createConsumerForTopics(topics)
88         }
89     }
90
91     describe("state") {
92         it("should return None when topics hasn't been initialized") {
93             assertThat(cut.state()).isEqualTo(None)
94         }
95
96         describe("when topics are initialized") {
97             beforeEachTest {
98                 cut.listenToTopics("perf3gpp")
99             }
100
101             it("should return some state when it has been set") {
102                 val state = consumerState()
103                 whenever(consumer.currentState()).thenReturn(state)
104
105                 assertThat(cut.state()).isEqualTo(Some(state))
106             }
107         }
108     }
109
110     describe("resetState") {
111         it("should do nothing when topics hasn't been initialized") {
112             cut.resetState()
113             verify(consumer, never()).reset()
114         }
115
116         describe("when topics are initialized") {
117             beforeEachTest {
118                 cut.listenToTopics("perf3gpp")
119             }
120
121             it("should reset the state") {
122                 // when
123                 cut.resetState()
124
125                 // then
126                 verify(consumer).reset()
127             }
128         }
129     }
130
131     describe("validate") {
132         beforeEachTest {
133             whenever(messageStreamValidation.validate(any(), any())).thenReturn(Mono.just(true))
134         }
135
136         it("should use empty list when consumer is unavailable") {
137             StepVerifier
138                     .create(cut.validate("['The JSON']".byteInputStream()))
139                     .expectNext(true)
140                     .verifyComplete()
141
142             verify(messageStreamValidation).validate(any(), eq(emptyList()))
143         }
144
145         it("should delegate to MessageStreamValidation") {
146             // given
147             cut.listenToTopics("perf3gpp")
148             whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
149
150            StepVerifier
151                     .create(cut.validate("['The JSON']".byteInputStream()))
152                    .expectNext(true)
153                     .verifyComplete()
154
155             // then
156             verify(messageStreamValidation).validate(any(), any())
157         }
158     }
159 })
160
161
162 private const val DUMMY_EVENT_ID = "aaa"
163 private const val DUMMY_PAYLOAD = "payload"
164
165 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
166     return VesEvent.newBuilder()
167             .setCommonEventHeader(CommonEventHeader.newBuilder()
168                     .setEventId(eventId))
169             .setEventFields(ByteString.copyFrom(payload.toByteArray()))
170             .build()
171 }