2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.config.impl
22 import arrow.core.None
23 import arrow.core.Option
24 import arrow.core.Some
25 import arrow.core.getOrElse
26 import com.nhaarman.mockitokotlin2.mock
27 import com.nhaarman.mockitokotlin2.whenever
28 import org.assertj.core.api.Assertions.assertThat
29 import org.assertj.core.api.Assertions.fail
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.given
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
37 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
39 import java.time.Duration
42 internal object ConfigurationTransformerTest : Spek({
43 describe("ConfigurationTransformer") {
44 val cut = ConfigurationTransformer()
46 describe("transforming partial configuration to final") {
47 val config = ValidatedPartialConfiguration(
48 listenPort = defaultListenPort,
49 idleTimeoutSec = defaultIdleTimeoutSec,
50 cbsConfiguration = ValidatedCbsConfiguration(
51 firstRequestDelaySec = defaultFirstReqDelaySec,
52 requestIntervalSec = defaultRequestIntervalSec
54 securityConfiguration = Some(ValidatedSecurityPaths(
55 keyStoreFile = KEYSTORE,
56 keyStorePasswordFile = KEYSTORE_PASS_FILE,
57 trustStoreFile = TRUSTSTORE,
58 trustStorePasswordFile = TRUSTSTORE_PASS_FILE
60 streamPublishers = sampleStreamsDefinition,
61 logLevel = Some(LogLevel.TRACE)
64 given("transformed configuration") {
65 val result = cut.toFinalConfiguration(config)
67 it("should create server configuration") {
68 assertThat(result.server.listenPort).isEqualTo(defaultListenPort)
69 assertThat(result.server.idleTimeout)
70 .describedAs("idleTimeout transformed from number to duration")
71 .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
74 it("should create CBS configuration") {
75 assertThat(result.cbs.firstRequestDelay)
76 .describedAs("firstRequestDelay transformed from number to duration")
77 .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
78 assertThat(result.cbs.requestInterval)
79 .describedAs("requestInterval transformed from number to duration")
80 .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
83 it("should create collector configuration") {
84 assertThat(result.collector.routing)
85 .describedAs("routing transformed from kafka sinks to routes")
86 .isEqualTo(sampleRouting)
88 assertThat(result.collector.maxPayloadSizeBytes)
89 .describedAs("maxPayloadSizeBytes calculated from kafka sinks")
90 .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
93 it("should use specified log level") {
94 assertThat(result.logLevel)
95 .describedAs("logLevel was not transformed when present")
96 .isEqualTo(LogLevel.TRACE)
99 it("should create security keys") {
100 result.security.keys.fold({ fail("Should be Some") }, {
101 assertThat(it.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
102 assertThat(it.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
103 it.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
104 it.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
110 describe("transforming configuration with empty log level") {
111 val config = validatedConfiguration(
115 it("should use default log level") {
116 val result = cut.toFinalConfiguration(config)
118 assertThat(result.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
122 describe("transforming configuration with security disabled") {
123 val config = validatedConfiguration(
124 sslDisable = Some(true),
126 keyStorePasswordFile = "",
128 trustStorePasswordFile = ""
131 it("should create valid configuration with empty security keys") {
132 val result = cut.toFinalConfiguration(config)
134 assertThat(result.security.keys).isEqualTo(None)
138 describe("transforming configuration with ssl disable missing") {
139 val config = validatedConfiguration(
143 it("should create configuration with ssl enabled") {
144 val result = cut.toFinalConfiguration(config)
145 val securityKeys = result.security.keys
146 .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
147 assertThat(securityKeys.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
148 assertThat(securityKeys.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
149 securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
150 securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
154 describe("calculating maxPayloadSizeBytes") {
155 on("defined routes") {
156 val highestMaxPayloadSize = 3
157 val sink1 = mock<KafkaSink>().also {
158 whenever(it.name()).thenReturn("1")
159 whenever(it.maxPayloadSizeBytes()).thenReturn(1)
161 val sink2 = mock<KafkaSink>().also {
162 whenever(it.name()).thenReturn("2")
163 whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
165 val config = validatedConfiguration(
166 streamPublishers = listOf(sink1, sink2)
169 val result = cut.toFinalConfiguration(config)
171 it("should use the highest value among all routes") {
172 assertThat(result.collector.maxPayloadSizeBytes)
173 .isEqualTo(highestMaxPayloadSize)
177 on("empty routing") {
178 val config = validatedConfiguration(
179 streamPublishers = emptyList()
182 val result = cut.toFinalConfiguration(config)
184 it("should use default value") {
185 assertThat(result.collector.maxPayloadSizeBytes)
186 .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
194 private fun validatedConfiguration(listenPort: Int = defaultListenPort,
195 idleTimeoutSec: Long = defaultIdleTimeoutSec,
196 firstReqDelaySec: Long = defaultFirstReqDelaySec,
197 requestIntervalSec: Long = defaultRequestIntervalSec,
198 sslDisable: Option<Boolean> = Some(false),
199 keyStoreFile: String = KEYSTORE,
200 keyStorePasswordFile: String = KEYSTORE_PASS_FILE,
201 trustStoreFile: String = TRUSTSTORE,
202 trustStorePasswordFile: String = TRUSTSTORE_PASS_FILE,
203 streamPublishers: List<KafkaSink> = sampleStreamsDefinition,
204 logLevel: Option<LogLevel> = Some(LogLevel.INFO)
205 ): ValidatedPartialConfiguration = PartialConfiguration(
206 listenPort = Some(listenPort),
207 idleTimeoutSec = Some(idleTimeoutSec),
208 firstRequestDelaySec = Some(firstReqDelaySec),
209 requestIntervalSec = Some(requestIntervalSec),
210 streamPublishers = Some(streamPublishers),
211 sslDisable = sslDisable,
212 keyStoreFile = Some(keyStoreFile),
213 keyStorePasswordFile = Some(keyStorePasswordFile),
214 trustStoreFile = Some(trustStoreFile),
215 trustStorePasswordFile = Some(trustStorePasswordFile),
217 ).unsafeAsValidated()