2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters
22 import com.nhaarman.mockitokotlin2.eq
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.whenever
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.mockito.Mockito
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
35 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
37 import reactor.core.publisher.Mono
38 import reactor.retry.Retry
39 import reactor.test.StepVerifier
40 import java.time.Duration
42 import kotlin.test.assertEquals
45 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
48 internal object ConsulConfigurationProviderTest : Spek({
50 describe("Consul configuration provider") {
52 val httpAdapterMock: HttpAdapter = mock()
53 val healthStateProvider = HealthState.INSTANCE
55 given("valid resource url") {
56 val validUrl = "http://valid-url/"
57 val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
59 on("call to consul") {
60 whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
61 .thenReturn(Mono.just(constructConsulResponse()))
63 it("should use received configuration") {
65 StepVerifier.create(consulConfigProvider().take(1))
68 assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
70 val route1 = it.routing.routes[0]
71 assertThat(FAULT.domainName)
72 .describedAs("routed domain 1")
73 .isEqualTo(route1.domain)
74 assertThat("test-topic-1")
75 .describedAs("target topic 1")
76 .isEqualTo(route1.targetTopic)
78 val route2 = it.routing.routes[1]
79 assertThat(HEARTBEAT.domainName)
80 .describedAs("routed domain 2")
81 .isEqualTo(route2.domain)
82 assertThat("test-topic-2")
83 .describedAs("target topic 2")
84 .isEqualTo(route2.targetTopic)
91 given("invalid resource url") {
92 val invalidUrl = "http://invalid-url/"
94 val iterationCount = 3L
95 val consulConfigProvider = constructConsulConfigProvider(
96 invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
99 on("call to consul") {
100 whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
101 .thenReturn(Mono.error(RuntimeException("Test exception")))
103 it("should interrupt the flux") {
105 StepVerifier.create(consulConfigProvider())
106 .verifyErrorMessage("Test exception")
109 it("should update the health state") {
110 StepVerifier.create(healthStateProvider().take(iterationCount))
111 .expectNextCount(iterationCount - 1)
112 .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
121 private fun constructConsulConfigProvider(url: String,
122 httpAdapter: HttpAdapter,
123 healthState: HealthState,
124 iterationCount: Long = 1
125 ): ConsulConfigurationProvider {
127 val firstRequestDelay = Duration.ofMillis(1)
128 val requestInterval = Duration.ofMillis(1)
129 val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
131 return ConsulConfigurationProvider(
142 const val kafkaAddress = "message-router-kafka"
144 fun constructConsulResponse(): String =
146 "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
147 "collector.routing": [
149 "fromDomain": "fault",
150 "toTopic": "test-topic-1"
153 "fromDomain": "heartbeat",
154 "toTopic": "test-topic-2"