2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
4 * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
5 * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
6 * ===============================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END========================================================================
19 package org.onap.dcaegen2.collectors.datafile.configuration;
21 import ch.qos.logback.classic.spi.ILoggingEvent;
22 import ch.qos.logback.core.read.ListAppender;
23 import com.google.common.base.Charsets;
24 import com.google.common.io.Resources;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonIOException;
27 import com.google.gson.JsonObject;
28 import com.google.gson.JsonParser;
29 import com.google.gson.JsonSyntaxException;
30 import org.junit.jupiter.api.Assertions;
31 import org.junit.jupiter.api.BeforeEach;
32 import org.junit.jupiter.api.Test;
33 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
34 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
35 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
39 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42 import reactor.test.StepVerifier;
43 import java.nio.file.Path;
45 import java.io.ByteArrayInputStream;
46 import java.io.IOException;
47 import java.io.InputStream;
48 import java.io.InputStreamReader;
50 import java.nio.charset.StandardCharsets;
52 import java.util.Properties;
54 import static org.assertj.core.api.Assertions.assertThat;
55 import static org.assertj.core.api.Assertions.assertThatThrownBy;
56 import static org.junit.jupiter.api.Assertions.assertTrue;
57 import static org.mockito.ArgumentMatchers.any;
58 import static org.mockito.Mockito.doReturn;
59 import static org.mockito.Mockito.mock;
60 import static org.mockito.Mockito.spy;
61 import static org.mockito.Mockito.times;
62 import static org.mockito.Mockito.verify;
63 import static org.mockito.Mockito.when;
66 * Tests the AppConfig.
68 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
69 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
73 public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
75 private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
76 ImmutablePublisherConfiguration.builder() //
77 .publishUrl("https://localhost:3907/publish/1") //
78 .logUrl("https://localhost:3907/feedlog/1") //
79 .trustStorePath("src/test/resources/trust.jks") //
80 .trustStorePasswordPath("src/test/resources/trust.pass") //
81 .keyStorePath("src/test/resources/cert.jks") //
82 .keyStorePasswordPath("src/test/resources/jks.pass") //
83 .enableDmaapCertAuth(true) //
84 .changeIdentifier("PM_MEAS_FILES") //
85 .userName("CYE9fl40") //
86 .password("izBJD8nLjawq0HMG") //
89 private static final ImmutableCertificateConfig CORRECT_CERTIFICATE_CONFIGURATION = //
90 new ImmutableCertificateConfig.Builder() //
91 .keyCert("/src/test/resources/dfc.jks") //
92 .keyPasswordPath("/src/test/resources/dfc.jks.pass") //
93 .trustedCa("/src/test/resources/cert.jks") //
94 .trustedCaPasswordPath("/src/test/resources/cert.jks.pass") //
95 .httpsHostnameVerify(true)
99 private AppConfig appConfigUnderTest;
100 private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
101 CbsClient cbsClient = mock(CbsClient.class);
102 CbsClientConfiguration cbsClientConfiguration = mock(CbsClientConfiguration.class);
106 appConfigUnderTest = spy(AppConfig.class);
107 appConfigUnderTest.systemEnvironment = new Properties();
112 void whenTheConfigurationFits() throws IOException, DatafileTaskException {
114 doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
115 appConfigUnderTest.initialize();
118 verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
120 ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
121 Assertions.assertNotNull(consumerCfg);
122 assertThat(consumerCfg).satisfies(this::checkCorrectConsumerConfiguration);
124 PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
125 Assertions.assertNotNull(publisherCfg);
126 assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
128 CertificateConfig certificateConfig = appConfigUnderTest.getCertificateConfiguration();
129 assertThat(certificateConfig)
131 .isEqualToComparingFieldByField(CORRECT_CERTIFICATE_CONFIGURATION);
135 void shouldInitializeApplicationWithoutCertificates() throws IOException {
137 doReturn(getCorrectConfigWithoutTLS()).when(appConfigUnderTest).createInputStream(any());
138 appConfigUnderTest.initialize();
141 verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
143 CertificateConfig certificateConfig = appConfigUnderTest.getCertificateConfiguration();
144 assertThat(certificateConfig).isNotNull();
148 void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException {
150 doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any());
151 appConfigUnderTest.loadConfigurationFromFile();
154 verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
155 Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
156 Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
157 Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES"));
158 Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES"));
160 assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl())
161 .isEqualTo("feed01::publish_url");
162 assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl())
163 .isEqualTo("feed01::publish_url");
167 void whenFileIsNotExist_ThrowException() throws DatafileTaskException {
169 appConfigUnderTest.setFilepath("/temp.json");
172 appConfigUnderTest.loadConfigurationFromFile();
175 Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
176 assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
177 .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
179 Assertions.assertNull(appConfigUnderTest.getCertificateConfiguration());
183 void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException {
186 doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
187 appConfigUnderTest.loadConfigurationFromFile();
190 verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
191 Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
192 assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
193 .hasMessageContaining(CHANGE_IDENTIFIER);
194 Assertions.assertNull(appConfigUnderTest.getCertificateConfiguration());
198 void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException {
201 doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
202 JsonElement jsonElement = mock(JsonElement.class);
203 when(jsonElement.isJsonObject()).thenReturn(false);
204 doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(InputStream.class));
205 appConfigUnderTest.loadConfigurationFromFile();
208 verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
209 Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
210 assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
211 .hasMessageContaining(CHANGE_IDENTIFIER);
212 Assertions.assertNull(appConfigUnderTest.getCertificateConfiguration());
216 void whenPeriodicConfigRefreshNoEnvironmentVariables() {
217 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
218 Flux<AppConfig> task = appConfigUnderTest.createRefreshTask();
222 .expectSubscription() //
223 .verifyComplete(); //
225 assertTrue(logAppender.list.toString().contains("CbsClientConfigurationException"));
229 void whenPeriodicConfigRefreshNoConsul() {
230 doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
231 doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
232 Flux<JsonObject> err = Flux.error(new IOException());
233 doReturn(err).when(cbsClient).updates(any(), any(), any());
235 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
236 Flux<AppConfig> task = appConfigUnderTest.createRefreshTask();
240 .expectSubscription() //
244 logAppender.list.toString().contains("Could not refresh application configuration java.io.IOException"));
248 void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
249 doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
250 doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
252 Flux<JsonObject> json = Flux.just(getJsonRootObject());
253 doReturn(json).when(cbsClient).updates(any(), any(), any());
255 Flux<AppConfig> task = appConfigUnderTest.createRefreshTask();
259 .expectSubscription() //
260 .expectNext(appConfigUnderTest) //
263 Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
267 void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
268 doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
269 doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
271 Flux<JsonObject> json = Flux.just(getJsonRootObject());
272 Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the
275 doReturn(json, err).when(cbsClient).updates(any(), any(), any());
277 Flux<AppConfig> task = appConfigUnderTest.createRefreshTask();
281 .expectSubscription() //
282 .expectNext(appConfigUnderTest) //
285 Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
288 private void checkCorrectConsumerConfiguration(ConsumerConfiguration consumerConfiguration) {
289 MessageRouterSubscribeRequest messageRouterSubscribeRequest =
290 consumerConfiguration.getMessageRouterSubscribeRequest();
291 assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDcae-c12");
292 assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("C12");
293 assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl())
294 .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
295 SecurityKeys securityKeys = consumerConfiguration.getMessageRouterSubscriberConfig().securityKeys();
296 assertThat(securityKeys.keyStore().path().toString()).hasToString(Path.of("src", "test","resources","cert.jks").toString());
297 assertThat(securityKeys.trustStore().path().toString()).hasToString(Path.of("src", "test","resources","trust.jks").toString());
299 assertThat(consumerConfiguration.getMessageRouterSubscriber()).isNotNull();
302 private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
303 JsonObject rootObject = JsonParser.parseReader(new InputStreamReader(getCorrectJson())).getAsJsonObject();
307 private static InputStream getCorrectJson() throws IOException {
308 URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json");
309 String string = Resources.toString(url, Charsets.UTF_8);
310 return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
313 private static InputStream getCorrectConfigWithoutTLS() throws IOException {
314 URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_test_config_no_tls.json");
315 String string = Resources.toString(url, Charsets.UTF_8);
316 return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
319 private static InputStream getCorrectJsonTwoProducers() throws IOException {
320 URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json");
321 String string = Resources.toString(url, Charsets.UTF_8);
322 return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
325 private static InputStream getIncorrectJson() {
326 String string = "{" + //
327 " \"configs\": {" + //
329 return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));