f21a2ecf910b7a5758b0071bb58084715e21d42b
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.nhaarman.mockito_kotlin.eq
23 import com.nhaarman.mockito_kotlin.mock
24 import com.nhaarman.mockito_kotlin.whenever
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.describe
27 import org.jetbrains.spek.api.dsl.given
28 import org.jetbrains.spek.api.dsl.it
29 import org.jetbrains.spek.api.dsl.on
30 import org.mockito.Mockito
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
33 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
35
36 import reactor.core.publisher.Mono
37 import reactor.retry.Retry
38 import reactor.test.StepVerifier
39 import java.time.Duration
40 import java.util.*
41 import kotlin.test.assertEquals
42
43 /**
44  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
45  * @since May 2018
46  */
47 internal object ConsulConfigurationProviderTest : Spek({
48
49     describe("Consul configuration provider") {
50
51         val httpAdapterMock: HttpAdapter = mock()
52         val healthStateProvider = HealthState.INSTANCE
53
54         given("valid resource url") {
55             val validUrl = "http://valid-url/"
56             val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
57
58             on("call to consul") {
59                 whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
60                         .thenReturn(Mono.just(constructConsulResponse()))
61
62                 it("should use received configuration") {
63
64                     StepVerifier.create(consulConfigProvider().take(1))
65                             .consumeNextWith {
66
67                                 assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
68
69                                 val route1 = it.routing.routes[0]
70                                 assertEquals(FAULT.name, route1.domain)
71                                 assertEquals("test-topic-1", route1.targetTopic)
72
73                                 val route2 = it.routing.routes[1]
74                                 assertEquals(HEARTBEAT.name, route2.domain)
75                                 assertEquals("test-topic-2", route2.targetTopic)
76
77                             }.verifyComplete()
78                 }
79             }
80
81         }
82         given("invalid resource url") {
83             val invalidUrl = "http://invalid-url/"
84
85             val iterationCount = 3L
86             val consulConfigProvider = constructConsulConfigProvider(
87                     invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
88             )
89
90             on("call to consul") {
91                 whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
92                         .thenReturn(Mono.error(RuntimeException("Test exception")))
93
94                 it("should interrupt the flux") {
95
96                     StepVerifier.create(consulConfigProvider())
97                             .verifyErrorMessage("Test exception")
98                 }
99
100                 it("should update the health state") {
101                     StepVerifier.create(healthStateProvider().take(iterationCount))
102                             .expectNextCount(iterationCount - 1)
103                             .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
104                             .verifyComplete()
105                 }
106             }
107         }
108     }
109
110 })
111
112 private fun constructConsulConfigProvider(url: String,
113                                           httpAdapter: HttpAdapter,
114                                           healthState: HealthState,
115                                           iterationCount: Long = 1
116 ): ConsulConfigurationProvider {
117
118     val firstRequestDelay = Duration.ofMillis(1)
119     val requestInterval = Duration.ofMillis(1)
120     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
121
122     return ConsulConfigurationProvider(
123             httpAdapter,
124             url,
125             firstRequestDelay,
126             requestInterval,
127             healthState,
128             retry
129     )
130 }
131
132
133 const val kafkaAddress = "message-router-kafka"
134
135 fun constructConsulResponse(): String {
136
137     val config = """{
138     "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
139     "collector.routing": [
140             {
141                 "fromDomain": "FAULT",
142                 "toTopic": "test-topic-1"
143             },
144             {
145                 "fromDomain": "HEARTBEAT",
146                 "toTopic": "test-topic-2"
147             }
148     ]
149     }"""
150
151     val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
152
153     return """[
154         {
155             "CreateIndex": 100,
156             "ModifyIndex": 200,
157             "LockIndex": 200,
158             "Key": "zip",
159             "Flags": 0,
160             "Value": "$encodedValue",
161             "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
162         }
163     ]"""
164 }