9a6889c84fdb8c12829f69ce725c1b16e6790a1a
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.nhaarman.mockito_kotlin.eq
23 import com.nhaarman.mockito_kotlin.mock
24 import com.nhaarman.mockito_kotlin.whenever
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.mockito.Mockito
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
35 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
36
37 import reactor.core.publisher.Mono
38 import reactor.retry.Retry
39 import reactor.test.StepVerifier
40 import java.time.Duration
41 import java.util.*
42 import kotlin.test.assertEquals
43
44 /**
45  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
46  * @since May 2018
47  */
48 internal object ConsulConfigurationProviderTest : Spek({
49
50     describe("Consul configuration provider") {
51
52         val httpAdapterMock: HttpAdapter = mock()
53         val healthStateProvider = HealthState.INSTANCE
54
55         given("valid resource url") {
56             val validUrl = "http://valid-url/"
57             val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
58
59             on("call to consul") {
60                 whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
61                         .thenReturn(Mono.just(constructConsulResponse()))
62
63                 it("should use received configuration") {
64
65                     StepVerifier.create(consulConfigProvider().take(1))
66                             .consumeNextWith {
67
68                                 assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
69
70                                 val route1 = it.routing.routes[0]
71                                 assertThat(FAULT.domainName)
72                                         .describedAs("routed domain 1")
73                                         .isEqualTo(route1.domain)
74                                 assertThat("test-topic-1")
75                                         .describedAs("target topic 1")
76                                         .isEqualTo(route1.targetTopic)
77
78                                 val route2 = it.routing.routes[1]
79                                 assertThat(HEARTBEAT.domainName)
80                                         .describedAs("routed domain 2")
81                                         .isEqualTo(route2.domain)
82                                 assertThat("test-topic-2")
83                                         .describedAs("target topic 2")
84                                         .isEqualTo(route2.targetTopic)
85
86                             }.verifyComplete()
87                 }
88             }
89
90         }
91         given("invalid resource url") {
92             val invalidUrl = "http://invalid-url/"
93
94             val iterationCount = 3L
95             val consulConfigProvider = constructConsulConfigProvider(
96                     invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
97             )
98
99             on("call to consul") {
100                 whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
101                         .thenReturn(Mono.error(RuntimeException("Test exception")))
102
103                 it("should interrupt the flux") {
104
105                     StepVerifier.create(consulConfigProvider())
106                             .verifyErrorMessage("Test exception")
107                 }
108
109                 it("should update the health state") {
110                     StepVerifier.create(healthStateProvider().take(iterationCount))
111                             .expectNextCount(iterationCount - 1)
112                             .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
113                             .verifyComplete()
114                 }
115             }
116         }
117     }
118
119 })
120
121 private fun constructConsulConfigProvider(url: String,
122                                           httpAdapter: HttpAdapter,
123                                           healthState: HealthState,
124                                           iterationCount: Long = 1
125 ): ConsulConfigurationProvider {
126
127     val firstRequestDelay = Duration.ofMillis(1)
128     val requestInterval = Duration.ofMillis(1)
129     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
130
131     return ConsulConfigurationProvider(
132             httpAdapter,
133             url,
134             firstRequestDelay,
135             requestInterval,
136             healthState,
137             retry
138     )
139 }
140
141
142 const val kafkaAddress = "message-router-kafka"
143
144 fun constructConsulResponse(): String {
145
146     val config = """{
147     "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
148     "collector.routing": [
149             {
150                 "fromDomain": "fault",
151                 "toTopic": "test-topic-1"
152             },
153             {
154                 "fromDomain": "heartbeat",
155                 "toTopic": "test-topic-2"
156             }
157     ]
158     }"""
159
160     val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
161
162     return """[
163         {
164             "CreateIndex": 100,
165             "ModifyIndex": 200,
166             "LockIndex": 200,
167             "Key": "zip",
168             "Flags": 0,
169             "Value": "$encodedValue",
170             "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
171         }
172     ]"""
173 }