Add all required and reasonable MDCs
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConsulConfigurationProviderTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.nhaarman.mockitokotlin2.any
23 import com.nhaarman.mockitokotlin2.eq
24 import com.nhaarman.mockitokotlin2.mock
25 import com.nhaarman.mockitokotlin2.whenever
26 import org.assertj.core.api.Assertions.assertThat
27 import org.jetbrains.spek.api.Spek
28 import org.jetbrains.spek.api.dsl.describe
29 import org.jetbrains.spek.api.dsl.given
30 import org.jetbrains.spek.api.dsl.it
31 import org.jetbrains.spek.api.dsl.on
32 import org.mockito.Mockito
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
34 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
35 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
36 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
37
38 import reactor.core.publisher.Mono
39 import reactor.retry.Retry
40 import reactor.test.StepVerifier
41 import java.time.Duration
42 import java.util.*
43 import kotlin.test.assertEquals
44
45 /**
46  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
47  * @since May 2018
48  */
49 internal object ConsulConfigurationProviderTest : Spek({
50
51     describe("Consul configuration provider") {
52
53         val httpAdapterMock: HttpAdapter = mock()
54         val healthStateProvider = HealthState.INSTANCE
55
56         given("valid resource url") {
57             val validUrl = "http://valid-url/"
58             val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
59
60             on("call to consul") {
61                 whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
62                         .thenReturn(Mono.just(constructConsulResponse()))
63
64                 it("should use received configuration") {
65
66                     StepVerifier.create(consulConfigProvider().take(1))
67                             .consumeNextWith {
68
69                                 assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
70
71                                 val route1 = it.routing.routes[0]
72                                 assertThat(FAULT.domainName)
73                                         .describedAs("routed domain 1")
74                                         .isEqualTo(route1.domain)
75                                 assertThat("test-topic-1")
76                                         .describedAs("target topic 1")
77                                         .isEqualTo(route1.targetTopic)
78
79                                 val route2 = it.routing.routes[1]
80                                 assertThat(HEARTBEAT.domainName)
81                                         .describedAs("routed domain 2")
82                                         .isEqualTo(route2.domain)
83                                 assertThat("test-topic-2")
84                                         .describedAs("target topic 2")
85                                         .isEqualTo(route2.targetTopic)
86
87                             }.verifyComplete()
88                 }
89             }
90
91         }
92         given("invalid resource url") {
93             val invalidUrl = "http://invalid-url/"
94
95             val iterationCount = 3L
96             val consulConfigProvider = constructConsulConfigProvider(
97                     invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
98             )
99
100             on("call to consul") {
101                 whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
102                         .thenReturn(Mono.error(RuntimeException("Test exception")))
103
104                 it("should interrupt the flux") {
105
106                     StepVerifier.create(consulConfigProvider())
107                             .verifyErrorMessage("Test exception")
108                 }
109
110                 it("should update the health state") {
111                     StepVerifier.create(healthStateProvider().take(iterationCount))
112                             .expectNextCount(iterationCount - 1)
113                             .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
114                             .verifyComplete()
115                 }
116             }
117         }
118     }
119
120 })
121
122 private fun constructConsulConfigProvider(url: String,
123                                           httpAdapter: HttpAdapter,
124                                           healthState: HealthState,
125                                           iterationCount: Long = 1
126 ): ConsulConfigurationProvider {
127
128     val firstRequestDelay = Duration.ofMillis(1)
129     val requestInterval = Duration.ofMillis(1)
130     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
131
132     return ConsulConfigurationProvider(
133             httpAdapter,
134             url,
135             firstRequestDelay,
136             requestInterval,
137             healthState,
138             retry
139     )
140 }
141
142
143 const val kafkaAddress = "message-router-kafka"
144
145 fun constructConsulResponse(): String =
146     """{
147     "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
148     "collector.routing": [
149             {
150                 "fromDomain": "fault",
151                 "toTopic": "test-topic-1"
152             },
153             {
154                 "fromDomain": "heartbeat",
155                 "toTopic": "test-topic-2"
156             }
157     ]
158     }"""