7f0c642219a2dd53254154290713fbf062eda782
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.configuration;
18
19 import ch.qos.logback.classic.spi.ILoggingEvent;
20 import ch.qos.logback.core.read.ListAppender;
21 import com.google.common.base.Charsets;
22 import com.google.common.io.Resources;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonIOException;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import com.google.gson.JsonSyntaxException;
28 import org.junit.jupiter.api.Assertions;
29 import org.junit.jupiter.api.BeforeEach;
30 import org.junit.jupiter.api.Test;
31 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
32 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
33 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
37 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
38 import reactor.core.publisher.Flux;
39 import reactor.core.publisher.Mono;
40 import reactor.test.StepVerifier;
41
42 import java.io.ByteArrayInputStream;
43 import java.io.IOException;
44 import java.io.InputStream;
45 import java.io.InputStreamReader;
46 import java.net.URL;
47 import java.nio.charset.StandardCharsets;
48 import java.util.Map;
49 import java.util.Properties;
50
51 import static org.assertj.core.api.Assertions.assertThat;
52 import static org.assertj.core.api.Assertions.assertThatThrownBy;
53 import static org.junit.jupiter.api.Assertions.assertTrue;
54 import static org.mockito.ArgumentMatchers.any;
55 import static org.mockito.Mockito.doReturn;
56 import static org.mockito.Mockito.mock;
57 import static org.mockito.Mockito.spy;
58 import static org.mockito.Mockito.times;
59 import static org.mockito.Mockito.verify;
60 import static org.mockito.Mockito.when;
61
62 /**
63  * Tests the AppConfig.
64  *
65  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
66  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
67  */
68 public class AppConfigTest {
69
70     public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
71
72     private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
73         ImmutablePublisherConfiguration.builder() //
74             .publishUrl("https://localhost:3907/publish/1") //
75             .logUrl("https://localhost:3907/feedlog/1") //
76             .trustStorePath("src/test/resources/trust.jks") //
77             .trustStorePasswordPath("src/test/resources/trust.pass") //
78             .keyStorePath("src/test/resources/cert.jks") //
79             .keyStorePasswordPath("src/test/resources/jks.pass") //
80             .enableDmaapCertAuth(true) //
81             .changeIdentifier("PM_MEAS_FILES") //
82             .userName("CYE9fl40") //
83             .password("izBJD8nLjawq0HMG") //
84             .build();
85
86     private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
87         new ImmutableFtpesConfig.Builder() //
88             .keyCert("/src/test/resources/dfc.jks") //
89             .keyPasswordPath("/src/test/resources/dfc.jks.pass") //
90             .trustedCa("/src/test/resources/ftp.jks") //
91             .trustedCaPasswordPath("/src/test/resources/ftp.jks.pass") //
92             .build();
93
94     private AppConfig appConfigUnderTest;
95     private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
96     CbsClient cbsClient = mock(CbsClient.class);
97     CbsClientConfiguration cbsClientConfiguration = mock(CbsClientConfiguration.class);
98
99     @BeforeEach
100     void setUp() {
101         appConfigUnderTest = spy(AppConfig.class);
102         appConfigUnderTest.systemEnvironment = new Properties();
103
104     }
105
106     @Test
107     public void whenTheConfigurationFits() throws IOException, DatafileTaskException {
108         // When
109         doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
110         appConfigUnderTest.initialize();
111
112         // Then
113         verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
114
115         ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
116         Assertions.assertNotNull(consumerCfg);
117         assertThat(consumerCfg).satisfies(this::checkCorrectConsumerConfiguration);
118
119         PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
120         Assertions.assertNotNull(publisherCfg);
121         assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
122
123         FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
124         assertThat(ftpesConfig).isNotNull();
125         assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
126     }
127
128     @Test
129     public void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException {
130         // When
131         doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any());
132         appConfigUnderTest.loadConfigurationFromFile();
133
134         // Then
135         verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
136         Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
137         Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
138         Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES"));
139         Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES"));
140
141         assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl())
142             .isEqualTo("feed01::publish_url");
143         assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl())
144             .isEqualTo("feed01::publish_url");
145     }
146
147     @Test
148     public void whenFileIsNotExist_ThrowException() throws DatafileTaskException {
149         // Given
150         appConfigUnderTest.setFilepath("/temp.json");
151
152         // When
153         appConfigUnderTest.loadConfigurationFromFile();
154
155         // Then
156         Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
157         assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
158             .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
159
160         Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
161     }
162
163     @Test
164     public void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException {
165
166         // When
167         doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
168         appConfigUnderTest.loadConfigurationFromFile();
169
170         // Then
171         verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
172         Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
173         assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
174             .hasMessageContaining(CHANGE_IDENTIFIER);
175         Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
176     }
177
178     @Test
179     public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException {
180
181         // When
182         doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
183         JsonElement jsonElement = mock(JsonElement.class);
184         when(jsonElement.isJsonObject()).thenReturn(false);
185         doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(InputStream.class));
186         appConfigUnderTest.loadConfigurationFromFile();
187
188         // Then
189         verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
190         Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
191         assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
192             .hasMessageContaining(CHANGE_IDENTIFIER);
193         Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
194     }
195
196     @Test
197     public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
198         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
199         Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
200
201         StepVerifier //
202             .create(task) //
203             .expectSubscription() //
204             .verifyComplete(); //
205
206         assertTrue(logAppender.list.toString().contains("CbsClientConfigurationException"));
207     }
208
209     @Test
210     public void whenPeriodicConfigRefreshNoConsul() {
211         doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
212         doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
213         Flux<JsonObject> err = Flux.error(new IOException());
214         doReturn(err).when(cbsClient).updates(any(), any(), any());
215
216         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
217         Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
218
219         StepVerifier //
220             .create(task) //
221             .expectSubscription() //
222             .verifyComplete();
223
224         assertTrue(
225             logAppender.list.toString().contains("Could not refresh application configuration java.io.IOException"));
226     }
227
228     @Test
229     public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
230         doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
231         doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
232
233         Flux<JsonObject> json = Flux.just(getJsonRootObject());
234         doReturn(json).when(cbsClient).updates(any(), any(), any());
235
236         Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
237
238         StepVerifier //
239             .create(task) //
240             .expectSubscription() //
241             .expectNext(appConfigUnderTest) //
242             .verifyComplete();
243
244         Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
245     }
246
247     @Test
248     public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
249         doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
250         doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
251
252         Flux<JsonObject> json = Flux.just(getJsonRootObject());
253         Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the
254         // dmaap plugin
255
256         doReturn(json, err).when(cbsClient).updates(any(), any(), any());
257
258         Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
259
260         StepVerifier //
261             .create(task) //
262             .expectSubscription() //
263             .expectNext(appConfigUnderTest) //
264             .verifyComplete();
265
266         Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
267     }
268
269     private void checkCorrectConsumerConfiguration(ConsumerConfiguration consumerConfiguration) {
270         MessageRouterSubscribeRequest messageRouterSubscribeRequest =
271                 consumerConfiguration.getMessageRouterSubscribeRequest();
272         assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDcae-c12");
273         assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("C12");
274         assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl())
275                 .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
276         SecurityKeys securityKeys = consumerConfiguration.getMessageRouterSubscriberConfig().securityKeys();
277         assertThat(securityKeys.keyStore().path().toString()).isEqualTo("src/test/resources/cert.jks");
278         assertThat(securityKeys.trustStore().path().toString()).isEqualTo("src/test/resources/trust.jks");
279         assertThat(consumerConfiguration.getMessageRouterSubscriber()).isNotNull();
280     }
281
282     private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
283         JsonObject rootObject = JsonParser.parseReader(new InputStreamReader(getCorrectJson())).getAsJsonObject();
284         return rootObject;
285     }
286
287     private static InputStream getCorrectJson() throws IOException {
288         URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json");
289         String string = Resources.toString(url, Charsets.UTF_8);
290         return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
291     }
292
293     private static InputStream getCorrectJsonTwoProducers() throws IOException {
294         URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json");
295         String string = Resources.toString(url, Charsets.UTF_8);
296         return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
297     }
298
299     private static InputStream getIncorrectJson() {
300         String string = "{" + //
301             "    \"configs\": {" + //
302             "        \"dmaap\": {"; //
303         return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
304     }
305 }