Bump checkstyle version
[dcaegen2/collectors/hv-ves.git] / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConsulConfigurationProviderTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.nhaarman.mockitokotlin2.eq
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.whenever
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.mockito.Mockito
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
35 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
36
37 import reactor.core.publisher.Mono
38 import reactor.retry.Retry
39 import reactor.test.StepVerifier
40 import java.time.Duration
41 import java.util.*
42 import kotlin.test.assertEquals
43
44 /**
45  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
46  * @since May 2018
47  */
48 internal object ConsulConfigurationProviderTest : Spek({
49
50     describe("Consul configuration provider") {
51
52         val httpAdapterMock: HttpAdapter = mock()
53         val healthStateProvider = HealthState.INSTANCE
54
55         given("valid resource url") {
56             val validUrl = "http://valid-url/"
57             val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
58
59             on("call to consul") {
60                 whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
61                         .thenReturn(Mono.just(constructConsulResponse()))
62
63                 it("should use received configuration") {
64
65                     StepVerifier.create(consulConfigProvider().take(1))
66                             .consumeNextWith {
67
68                                 assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
69
70                                 val route1 = it.routing.routes[0]
71                                 assertThat(FAULT.domainName)
72                                         .describedAs("routed domain 1")
73                                         .isEqualTo(route1.domain)
74                                 assertThat("test-topic-1")
75                                         .describedAs("target topic 1")
76                                         .isEqualTo(route1.targetTopic)
77
78                                 val route2 = it.routing.routes[1]
79                                 assertThat(HEARTBEAT.domainName)
80                                         .describedAs("routed domain 2")
81                                         .isEqualTo(route2.domain)
82                                 assertThat("test-topic-2")
83                                         .describedAs("target topic 2")
84                                         .isEqualTo(route2.targetTopic)
85
86                             }.verifyComplete()
87                 }
88             }
89
90         }
91         given("invalid resource url") {
92             val invalidUrl = "http://invalid-url/"
93
94             val iterationCount = 3L
95             val consulConfigProvider = constructConsulConfigProvider(
96                     invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
97             )
98
99             on("call to consul") {
100                 whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
101                         .thenReturn(Mono.error(RuntimeException("Test exception")))
102
103                 it("should interrupt the flux") {
104
105                     StepVerifier.create(consulConfigProvider())
106                             .verifyErrorMessage("Test exception")
107                 }
108
109                 it("should update the health state") {
110                     StepVerifier.create(healthStateProvider().take(iterationCount))
111                             .expectNextCount(iterationCount - 1)
112                             .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
113                             .verifyComplete()
114                 }
115             }
116         }
117     }
118
119 })
120
121 private fun constructConsulConfigProvider(url: String,
122                                           httpAdapter: HttpAdapter,
123                                           healthState: HealthState,
124                                           iterationCount: Long = 1
125 ): ConsulConfigurationProvider {
126
127     val firstRequestDelay = Duration.ofMillis(1)
128     val requestInterval = Duration.ofMillis(1)
129     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
130
131     return ConsulConfigurationProvider(
132             httpAdapter,
133             url,
134             firstRequestDelay,
135             requestInterval,
136             healthState,
137             retry
138     )
139 }
140
141
142 const val kafkaAddress = "message-router-kafka"
143
144 fun constructConsulResponse(): String =
145     """{
146     "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
147     "collector.routing": [
148             {
149                 "fromDomain": "fault",
150                 "toTopic": "test-topic-1"
151             },
152             {
153                 "fromDomain": "heartbeat",
154                 "toTopic": "test-topic-2"
155             }
156     ]
157     }"""