808a6fcca3925ca7f6afd2424ed2fb072be951c4
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.nhaarman.mockito_kotlin.eq
23 import com.nhaarman.mockito_kotlin.mock
24 import com.nhaarman.mockito_kotlin.whenever
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.mockito.Mockito
29 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
30 import reactor.core.publisher.Mono
31 import reactor.test.StepVerifier
32 import java.time.Duration
33 import java.util.*
34 import kotlin.test.assertEquals
35
36 /**
37  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
38  * @since May 2018
39  */
40 internal object ConsulConfigurationProviderTest : Spek({
41
42     val httpAdapterMock: HttpAdapter = mock()
43     val firstRequestDelay = Duration.ofMillis(1)
44     val requestInterval = Duration.ofMillis(1)
45
46     given("valid resource url") {
47
48         val validUrl = "http://valid-url/"
49         val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval)
50
51         whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
52                 .thenReturn(Mono.just(constructConsulResponse()))
53
54         it("should use default configuration at the beginning, " +
55                 "then apply received configuration") {
56
57             StepVerifier.create(consulConfigProvider().take(2))
58                     .consumeNextWith {
59
60                         assertEquals("kafka:9092", it.kafkaBootstrapServers)
61
62                         val route1 = it.routing.routes[0]
63                         assertEquals(Domain.HVRANMEAS, route1.domain)
64                         assertEquals("ves_hvRanMeas", route1.targetTopic)
65                     }
66                     .consumeNextWith {
67
68                         assertEquals("kafka:9093", it.kafkaBootstrapServers)
69
70                         val route1 = it.routing.routes[0]
71                         assertEquals(Domain.HEARTBEAT, route1.domain)
72                         assertEquals("test-topic-1", route1.targetTopic)
73
74                         val route2 = it.routing.routes[1]
75                         assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route2.domain)
76                         assertEquals("test-topic-2", route2.targetTopic)
77
78                     }.verifyComplete()
79         }
80     }
81     given("invalid resource url") {
82
83         val invalidUrl = "http://invalid-url/"
84         val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval)
85
86         whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
87                 .thenReturn(Mono.error(RuntimeException("Test exception")))
88
89         it("should use default configuration at the beginning, then should interrupt the flux") {
90
91             StepVerifier.create(consulConfigProvider())
92                     .consumeNextWith {
93
94
95                         assertEquals("kafka:9092", it.kafkaBootstrapServers)
96
97                         val route1 = it.routing.routes[0]
98                         assertEquals(Domain.HVRANMEAS, route1.domain)
99                         assertEquals("ves_hvRanMeas", route1.targetTopic)
100                     }
101                     .verifyErrorMessage("Test exception")
102         }
103     }
104 })
105
106 fun constructConsulResponse(): String {
107
108     val config = """{
109         "kafkaBootstrapServers": "kafka:9093",
110         "routing": [
111                     {
112                         "fromDomain": 1,
113                         "toTopic": "test-topic-1"
114                     },
115                     {
116                         "fromDomain": 2,
117                         "toTopic": "test-topic-2"
118                 }
119     ]
120     }"""
121
122     val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
123
124     return """[
125         {
126             "CreateIndex": 100,
127             "ModifyIndex": 200,
128             "LockIndex": 200,
129             "Key": "zip",
130             "Flags": 0,
131             "Value": "$encodedValue",
132             "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
133         }
134     ]"""
135 }