42919e4d1e62c4c9654bd81c65555cf2f8a728c6
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.config.impl
21
22 import arrow.core.None
23 import arrow.core.Option
24 import arrow.core.Some
25 import arrow.core.getOrElse
26 import com.nhaarman.mockitokotlin2.mock
27 import com.nhaarman.mockitokotlin2.whenever
28 import org.assertj.core.api.Assertions.assertThat
29 import org.assertj.core.api.Assertions.fail
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.given
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
37 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
38 import java.io.File
39 import java.time.Duration
40
41
42 internal object ConfigurationTransformerTest : Spek({
43     describe("ConfigurationTransformer") {
44         val cut = ConfigurationTransformer()
45
46         describe("transforming partial configuration to final") {
47             val config = ValidatedPartialConfiguration(
48                     listenPort = defaultListenPort,
49                     idleTimeoutSec = defaultIdleTimeoutSec,
50                     cbsConfiguration = ValidatedCbsConfiguration(
51                             firstRequestDelaySec = defaultFirstReqDelaySec,
52                             requestIntervalSec = defaultRequestIntervalSec
53                     ),
54                     securityConfiguration = Some(ValidatedSecurityPaths(
55                             keyStoreFile = KEYSTORE,
56                             keyStorePasswordFile = KEYSTORE_PASS_FILE,
57                             trustStoreFile = TRUSTSTORE,
58                             trustStorePasswordFile = TRUSTSTORE_PASS_FILE
59                     )),
60                     streamPublishers = sampleStreamsDefinition,
61                     logLevel = Some(LogLevel.TRACE)
62             )
63
64             given("transformed configuration") {
65                 val result = cut.toFinalConfiguration(config)
66
67                 it("should create server configuration") {
68                     assertThat(result.server.listenPort).isEqualTo(defaultListenPort)
69                     assertThat(result.server.idleTimeout)
70                             .describedAs("idleTimeout transformed from number to duration")
71                             .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
72                 }
73
74                 it("should create CBS configuration") {
75                     assertThat(result.cbs.firstRequestDelay)
76                             .describedAs("firstRequestDelay transformed from number to duration")
77                             .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
78                     assertThat(result.cbs.requestInterval)
79                             .describedAs("requestInterval transformed from number to duration")
80                             .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
81                 }
82
83                 it("should create collector configuration") {
84                     assertThat(result.collector.routing)
85                             .describedAs("routing transformed from kafka sinks to routes")
86                             .isEqualTo(sampleRouting)
87
88                     assertThat(result.collector.maxPayloadSizeBytes)
89                             .describedAs("maxPayloadSizeBytes calculated from kafka sinks")
90                             .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
91                 }
92
93                 it("should use specified log level") {
94                     assertThat(result.logLevel)
95                             .describedAs("logLevel was not transformed when present")
96                             .isEqualTo(LogLevel.TRACE)
97                 }
98
99                 it("should create security keys") {
100                     result.security.keys.fold({ fail("Should be Some") }, {
101                         assertThat(it.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
102                         assertThat(it.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
103                         it.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
104                         it.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
105                     })
106                 }
107             }
108         }
109
110         describe("transforming configuration with empty log level") {
111             val config = validatedConfiguration(
112                     logLevel = None
113             )
114
115             it("should use default log level") {
116                 val result = cut.toFinalConfiguration(config)
117
118                 assertThat(result.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
119             }
120         }
121
122         describe("transforming configuration with security disabled") {
123             val config = validatedConfiguration(
124                     sslDisable = Some(true),
125                     keyStoreFile = "",
126                     keyStorePasswordFile = "",
127                     trustStoreFile = "",
128                     trustStorePasswordFile = ""
129             )
130
131             it("should create valid configuration with empty security keys") {
132                 val result = cut.toFinalConfiguration(config)
133
134                 assertThat(result.security.keys).isEqualTo(None)
135             }
136         }
137
138         describe("transforming configuration with ssl disable missing") {
139             val config = validatedConfiguration(
140                     sslDisable = None
141             )
142
143             it("should create configuration with ssl enabled") {
144                 val result = cut.toFinalConfiguration(config)
145                 val securityKeys = result.security.keys
146                         .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
147                 assertThat(securityKeys.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
148                 assertThat(securityKeys.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
149                 securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
150                 securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
151             }
152         }
153
154         describe("calculating maxPayloadSizeBytes") {
155             on("defined routes") {
156                 val highestMaxPayloadSize = 3
157                 val sink1 = mock<KafkaSink>().also {
158                     whenever(it.name()).thenReturn("1")
159                     whenever(it.maxPayloadSizeBytes()).thenReturn(1)
160                 }
161                 val sink2 = mock<KafkaSink>().also {
162                     whenever(it.name()).thenReturn("2")
163                     whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
164                 }
165                 val config = validatedConfiguration(
166                         streamPublishers = listOf(sink1, sink2)
167                 )
168
169                 val result = cut.toFinalConfiguration(config)
170
171                 it("should use the highest value among all routes") {
172                     assertThat(result.collector.maxPayloadSizeBytes)
173                             .isEqualTo(highestMaxPayloadSize)
174                 }
175             }
176
177             on("empty routing") {
178                 val config = validatedConfiguration(
179                         streamPublishers = emptyList()
180                 )
181
182                 val result = cut.toFinalConfiguration(config)
183
184                 it("should use default value") {
185                     assertThat(result.collector.maxPayloadSizeBytes)
186                             .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
187                 }
188             }
189         }
190
191     }
192 })
193
194 private fun validatedConfiguration(listenPort: Int = defaultListenPort,
195                                    idleTimeoutSec: Long = defaultIdleTimeoutSec,
196                                    firstReqDelaySec: Long = defaultFirstReqDelaySec,
197                                    requestIntervalSec: Long = defaultRequestIntervalSec,
198                                    sslDisable: Option<Boolean> = Some(false),
199                                    keyStoreFile: String = KEYSTORE,
200                                    keyStorePasswordFile: String = KEYSTORE_PASS_FILE,
201                                    trustStoreFile: String = TRUSTSTORE,
202                                    trustStorePasswordFile: String = TRUSTSTORE_PASS_FILE,
203                                    streamPublishers: List<KafkaSink> = sampleStreamsDefinition,
204                                    logLevel: Option<LogLevel> = Some(LogLevel.INFO)
205 ): ValidatedPartialConfiguration = PartialConfiguration(
206         listenPort = Some(listenPort),
207         idleTimeoutSec = Some(idleTimeoutSec),
208         firstRequestDelaySec = Some(firstReqDelaySec),
209         requestIntervalSec = Some(requestIntervalSec),
210         streamPublishers = Some(streamPublishers),
211         sslDisable = sslDisable,
212         keyStoreFile = Some(keyStoreFile),
213         keyStorePasswordFile = Some(keyStorePasswordFile),
214         trustStoreFile = Some(trustStoreFile),
215         trustStorePasswordFile = Some(trustStorePasswordFile),
216         logLevel = logLevel
217 ).unsafeAsValidated()
218