2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters
22 import com.nhaarman.mockito_kotlin.eq
23 import com.nhaarman.mockito_kotlin.mock
24 import com.nhaarman.mockito_kotlin.whenever
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.mockito.Mockito
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
35 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
37 import reactor.core.publisher.Mono
38 import reactor.retry.Retry
39 import reactor.test.StepVerifier
40 import java.time.Duration
42 import kotlin.test.assertEquals
45 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
48 internal object ConsulConfigurationProviderTest : Spek({
50 describe("Consul configuration provider") {
52 val httpAdapterMock: HttpAdapter = mock()
53 val healthStateProvider = HealthState.INSTANCE
55 given("valid resource url") {
56 val validUrl = "http://valid-url/"
57 val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
59 on("call to consul") {
60 whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
61 .thenReturn(Mono.just(constructConsulResponse()))
63 it("should use received configuration") {
65 StepVerifier.create(consulConfigProvider().take(1))
68 assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
70 val route1 = it.routing.routes[0]
71 assertThat(FAULT.domainName)
72 .describedAs("routed domain 1")
73 .isEqualTo(route1.domain)
74 assertThat("test-topic-1")
75 .describedAs("target topic 1")
76 .isEqualTo(route1.targetTopic)
78 val route2 = it.routing.routes[1]
79 assertThat(HEARTBEAT.domainName)
80 .describedAs("routed domain 2")
81 .isEqualTo(route2.domain)
82 assertThat("test-topic-2")
83 .describedAs("target topic 2")
84 .isEqualTo(route2.targetTopic)
91 given("invalid resource url") {
92 val invalidUrl = "http://invalid-url/"
94 val iterationCount = 3L
95 val consulConfigProvider = constructConsulConfigProvider(
96 invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
99 on("call to consul") {
100 whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
101 .thenReturn(Mono.error(RuntimeException("Test exception")))
103 it("should interrupt the flux") {
105 StepVerifier.create(consulConfigProvider())
106 .verifyErrorMessage("Test exception")
109 it("should update the health state") {
110 StepVerifier.create(healthStateProvider().take(iterationCount))
111 .expectNextCount(iterationCount - 1)
112 .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
121 private fun constructConsulConfigProvider(url: String,
122 httpAdapter: HttpAdapter,
123 healthState: HealthState,
124 iterationCount: Long = 1
125 ): ConsulConfigurationProvider {
127 val firstRequestDelay = Duration.ofMillis(1)
128 val requestInterval = Duration.ofMillis(1)
129 val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
131 return ConsulConfigurationProvider(
142 const val kafkaAddress = "message-router-kafka"
144 fun constructConsulResponse(): String {
147 "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
148 "collector.routing": [
150 "fromDomain": "fault",
151 "toTopic": "test-topic-1"
154 "fromDomain": "heartbeat",
155 "toTopic": "test-topic-2"
160 val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
169 "Value": "$encodedValue",
170 "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"