c98c97a670df91aab8edd6c596d22f328668d436
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.nhaarman.mockito_kotlin.eq
23 import com.nhaarman.mockito_kotlin.mock
24 import com.nhaarman.mockito_kotlin.verify
25 import com.nhaarman.mockito_kotlin.whenever
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.given
28 import org.jetbrains.spek.api.dsl.it
29 import org.mockito.Mockito
30 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
31 import reactor.core.publisher.Mono
32 import reactor.ipc.netty.http.client.HttpClient
33 import reactor.test.StepVerifier
34 import java.time.Duration
35 import java.util.*
36 import kotlin.test.assertEquals
37
38 /**
39  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
40  * @since May 2018
41  */
42 internal object ConsulConfigurationProviderTest : Spek({
43
44     val httpAdapterMock: HttpAdapter = mock()
45     val firstRequestDelay = Duration.ofMillis(1)
46
47     given("valid resource url") {
48
49         val validUrl = "http://valid-url/"
50         val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay)
51
52         whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
53                 .thenReturn(Mono.just(constructConsulResponse()))
54
55         it("should use default configuration at the beginning, " +
56                 "then apply received configuration") {
57
58             StepVerifier.create(consulConfigProvider().take(2))
59                     .consumeNextWith {
60
61                         assertEquals("kafka:9092", it.kafkaBootstrapServers)
62
63                         val route1 = it.routing.routes[0]
64                         assertEquals(Domain.HVRANMEAS, route1.domain)
65                         assertEquals("ves_hvRanMeas", route1.targetTopic)
66                     }
67                     .consumeNextWith {
68
69                         assertEquals("kafka:9093", it.kafkaBootstrapServers)
70
71                         val route1 = it.routing.routes[0]
72                         assertEquals(Domain.HEARTBEAT, route1.domain)
73                         assertEquals("test-topic-1", route1.targetTopic)
74
75                         val route2 = it.routing.routes[1]
76                         assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route2.domain)
77                         assertEquals("test-topic-2", route2.targetTopic)
78
79                     }.verifyComplete()
80         }
81     }
82     given("invalid resource url") {
83
84         val invalidUrl = "http://invalid-url/"
85         val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay)
86
87         whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
88                 .thenReturn(Mono.error(RuntimeException("Test exception")))
89
90         it("should use default configuration at the beginning, then should interrupt the flux") {
91
92             StepVerifier.create(consulConfigProvider())
93                     .consumeNextWith {
94
95
96                         assertEquals("kafka:9092", it.kafkaBootstrapServers)
97
98                         val route1 = it.routing.routes[0]
99                         assertEquals(Domain.HVRANMEAS, route1.domain)
100                         assertEquals("ves_hvRanMeas", route1.targetTopic)
101                     }
102                     .verifyErrorMessage("Test exception")
103         }
104     }
105 })
106
107 fun constructConsulResponse(): String {
108
109     val config = """{
110         "kafkaBootstrapServers": "kafka:9093",
111         "routing": [
112                     {
113                         "fromDomain": 1,
114                         "toTopic": "test-topic-1"
115                     },
116                     {
117                         "fromDomain": 2,
118                         "toTopic": "test-topic-2"
119                 }
120     ]
121     }"""
122
123     val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
124
125     return """[
126         {
127             "CreateIndex": 100,
128             "ModifyIndex": 200,
129             "LockIndex": 200,
130             "Key": "zip",
131             "Flags": 0,
132             "Value": "$encodedValue",
133             "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
134         }
135     ]"""
136 }