e3e61c81d100981f87cf955c4de6d111834a19a8
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
21
22 import arrow.core.Right
23 import com.google.protobuf.ByteString
24 import com.nhaarman.mockitokotlin2.any
25 import com.nhaarman.mockitokotlin2.eq
26 import com.nhaarman.mockitokotlin2.mock
27 import com.nhaarman.mockitokotlin2.never
28 import com.nhaarman.mockitokotlin2.verify
29 import com.nhaarman.mockitokotlin2.whenever
30 import org.assertj.core.api.Assertions.*
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.it
34 import org.mockito.ArgumentMatchers.anySet
35 import org.onap.ves.VesEventOuterClass.CommonEventHeader
36 import org.onap.ves.VesEventOuterClass.VesEvent
37 import reactor.core.publisher.Mono
38 import reactor.test.StepVerifier
39 import java.lang.IllegalArgumentException
40 import java.util.concurrent.ConcurrentLinkedQueue
41 import kotlin.test.assertFailsWith
42
43
44 /**
45  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
46  * @since August 2018
47  */
48 internal class DcaeAppSimulatorTest : Spek({
49     lateinit var consumerFactory: ConsumerFactory
50     lateinit var messageStreamValidation: MessageStreamValidation
51     lateinit var perf3gpp_consumer: Consumer
52     lateinit var faults_consumer: Consumer
53     lateinit var cut: DcaeAppSimulator
54
55     beforeEachTest {
56         consumerFactory = mock()
57         messageStreamValidation = mock()
58         perf3gpp_consumer = mock()
59         faults_consumer = mock()
60         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
61
62         whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf(
63                 PERF3GPP_TOPIC to perf3gpp_consumer,
64                 FAULTS_TOPICS to faults_consumer))
65     }
66
67     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
68
69     describe("listenToTopics") {
70         it("should fail when topic list is empty") {
71             assertFailsWith(IllegalArgumentException::class) {
72                 cut.listenToTopics(setOf())
73             }
74         }
75
76         it("should fail when topic list contains empty strings") {
77             assertFailsWith(IllegalArgumentException::class) {
78                 cut.listenToTopics(setOf(PERF3GPP_TOPIC, " ", FAULTS_TOPICS))
79             }
80         }
81
82         it("should subscribe to given topics") {
83             cut.listenToTopics(TWO_TOPICS)
84             verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
85         }
86
87         it("should subscribe to given topics when called with comma separated list") {
88             cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS")
89             verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
90         }
91     }
92
93     describe("state") {
94         it("should return Left when topics hasn't been initialized") {
95             assertThat(cut.state(PERF3GPP_TOPIC).isLeft()).isTrue()
96         }
97
98         describe("when topics are initialized") {
99             beforeEachTest {
100                 cut.listenToTopics(TWO_TOPICS)
101             }
102
103             it("should return state when it has been set") {
104                 val state = consumerState()
105                 whenever(perf3gpp_consumer.currentState()).thenReturn(state)
106                 whenever(faults_consumer.currentState()).thenReturn(state)
107
108                 assertThat(cut.state(PERF3GPP_TOPIC)).isEqualTo(Right(state))
109                 assertThat(cut.state(FAULTS_TOPICS)).isEqualTo(Right(state))
110             }
111         }
112     }
113
114     describe("resetState") {
115         it("should do nothing when topics hasn't been initialized") {
116             cut.resetState(PERF3GPP_TOPIC)
117             cut.resetState(FAULTS_TOPICS)
118             verify(perf3gpp_consumer, never()).reset()
119             verify(faults_consumer, never()).reset()
120         }
121
122         describe("when topics are initialized") {
123             beforeEachTest {
124                 cut.listenToTopics(TWO_TOPICS)
125             }
126
127             it("should reset the state of given topic consumer") {
128                 cut.resetState(PERF3GPP_TOPIC)
129
130                 verify(perf3gpp_consumer).reset()
131                 verify(faults_consumer, never()).reset()
132             }
133         }
134     }
135
136     describe("validate") {
137         beforeEachTest {
138             whenever(messageStreamValidation.validate(any(), any())).thenReturn(Mono.just(true))
139         }
140
141         it("should use empty list when consumer is unavailable") {
142             StepVerifier
143                     .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
144                     .expectNext(true)
145                     .verifyComplete()
146
147             verify(messageStreamValidation).validate(any(), eq(emptyList()))
148         }
149
150         it("should delegate to MessageStreamValidation") {
151             cut.listenToTopics(PERF3GPP_TOPIC)
152             whenever(perf3gpp_consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
153
154             StepVerifier
155                     .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
156                     .expectNext(true)
157                     .verifyComplete()
158
159             verify(messageStreamValidation).validate(any(), any())
160         }
161     }
162 })
163
164
165 private const val PERF3GPP_TOPIC = "perf3gpp"
166 private const val FAULTS_TOPICS = "faults"
167 private val TWO_TOPICS = setOf(PERF3GPP_TOPIC, FAULTS_TOPICS)
168
169 private const val DUMMY_EVENT_ID = "aaa"
170 private const val DUMMY_PAYLOAD = "payload"
171
172 private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
173     return VesEvent.newBuilder()
174             .setCommonEventHeader(CommonEventHeader.newBuilder()
175                     .setEventId(eventId))
176             .setEventFields(ByteString.copyFrom(payload.toByteArray()))
177             .build()
178 }