5a8d962f95478f5d0bd58ed33148b45ca8351ba3
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile.tasks;
22
23 import static org.awaitility.Awaitility.await;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyLong;
29 import static org.mockito.ArgumentMatchers.anyString;
30 import static org.mockito.ArgumentMatchers.notNull;
31 import static org.mockito.Mockito.doReturn;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.verifyNoMoreInteractions;
37
38 import ch.qos.logback.classic.spi.ILoggingEvent;
39 import ch.qos.logback.core.read.ListAppender;
40 import java.nio.file.Paths;
41 import java.time.Duration;
42 import java.time.Instant;
43 import java.util.HashMap;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.Map;
47 import org.apache.commons.lang3.StringUtils;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Test;
50 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
51 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
52 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
53 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
54 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
55 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
56 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
57 import org.onap.dcaegen2.collectors.datafile.model.FileData;
58 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
59 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
60 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
61 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
62 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
63 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
64 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
65 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
66 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
67 import org.slf4j.MDC;
68 import reactor.core.publisher.Flux;
69 import reactor.core.publisher.Mono;
70 import reactor.test.StepVerifier;
71
72 public class ScheduledTasksTest {
73
74     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
75     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
76
77     private AppConfig appConfig = mock(AppConfig.class);
78     private ScheduledTasks testedObject;
79
80     private int uniqueValue = 0;
81     private DMaaPMessageConsumer consumerMock;
82     private PublishedChecker publishedCheckerMock;
83     private FileCollector fileCollectorMock;
84     private DataRouterPublisher dataRouterMock;
85     private Map<String, String> contextMap = new HashMap<String, String>();
86
87     private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
88
89     @BeforeEach
90     private void setUp() throws DatafileTaskException {
91         testedObject = spy(new ScheduledTasks(appConfig));
92
93         consumerMock = mock(DMaaPMessageConsumer.class);
94         publishedCheckerMock = mock(PublishedChecker.class);
95         fileCollectorMock = mock(FileCollector.class);
96         dataRouterMock = mock(DataRouterPublisher.class);
97
98         doReturn(consumerMock).when(testedObject).createConsumerTask();
99         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
100         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
101         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
102     }
103
104     private void setUpConfiguration() throws DatafileTaskException {
105         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
106             .publishUrl(publishUrl) //
107             .logUrl("") //
108             .userName("userName") //
109             .passWord("passWord") //
110             .trustStorePath("trustStorePath") //
111             .trustStorePasswordPath("trustStorePasswordPath") //
112             .keyStorePath("keyStorePath") //
113             .keyStorePasswordPath("keyStorePasswordPath") //
114             .enableDmaapCertAuth(true) //
115             .changeIdentifier(CHANGE_IDENTIFIER) //
116             .build(); //
117         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
118             .topicUrl("topicUrl").trustStorePath("trustStorePath") //
119             .trustStorePasswordPath("trustStorePasswordPath") //
120             .keyStorePath("keyStorePath") //
121             .keyStorePasswordPath("keyStorePasswordPath") //
122             .enableDmaapCertAuth(true) //
123             .build();
124
125         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
126         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
127         doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
128     }
129
130     private MessageMetaData messageMetaData() {
131         return ImmutableMessageMetaData.builder() //
132             .productName("productName") //
133             .vendorName("") //
134             .lastEpochMicrosec("") //
135             .sourceName("") //
136             .startEpochMicrosec("") //
137             .timeZoneOffset("") //
138             .changeIdentifier(CHANGE_IDENTIFIER) //
139             .changeType("") //
140             .build();
141     }
142
143     private FileData fileData(int instanceNumber) {
144         return ImmutableFileData.builder() //
145             .name(PM_FILE_NAME + instanceNumber) //
146             .fileFormatType("") //
147             .fileFormatVersion("") //
148             .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
149             .scheme(Scheme.FTPS) //
150             .compression("") //
151             .messageMetaData(messageMetaData()) //
152             .build();
153     }
154
155     private List<FileData> files(int size, boolean uniqueNames) {
156         List<FileData> list = new LinkedList<FileData>();
157         for (int i = 0; i < size; ++i) {
158             if (uniqueNames) {
159                 ++uniqueValue;
160             }
161             list.add(fileData(uniqueValue));
162         }
163         return list;
164     }
165
166     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
167         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
168     }
169
170     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
171         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
172         for (int i = 0; i < numberOfEvents; ++i) {
173             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
174         }
175         return Flux.fromIterable(list);
176     }
177
178     private FilePublishInformation filePublishInformation() {
179         return ImmutableFilePublishInformation //
180             .builder() //
181             .productName("") //
182             .vendorName("") //
183             .lastEpochMicrosec("") //
184             .sourceName("") //
185             .startEpochMicrosec("") //
186             .timeZoneOffset("") //
187             .name("") //
188             .location("") //
189             .internalLocation(Paths.get("internalLocation")) //
190             .compression("") //
191             .fileFormatType("") //
192             .fileFormatVersion("") //
193             .changeIdentifier(CHANGE_IDENTIFIER) //
194             .context(new HashMap<String, String>()).build();
195     }
196
197     @Test
198     public void purgeFileCache() {
199         testedObject.publishedFilesCache.put(Paths.get("file.xml"));
200
201         testedObject.purgeCachedInformation(Instant.MAX);
202
203         assertEquals(0, testedObject.publishedFilesCacheSize());
204     }
205
206     @Test
207     public void nothingToConsume() throws DatafileTaskException {
208         setUpConfiguration();
209
210         doReturn(consumerMock).when(testedObject).createConsumerTask();
211         doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
212
213         testedObject.executeDatafileMainTask();
214
215         assertEquals(0, testedObject.getCurrentNumberOfTasks());
216         verify(consumerMock, times(1)).getMessageRouterResponse();
217         verifyNoMoreInteractions(consumerMock);
218     }
219
220     @Test
221     public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
222         doReturn(51).when(testedObject).getCurrentNumberOfTasks();
223
224         testedObject.executeDatafileMainTask();
225
226         verifyNoMoreInteractions(consumerMock);
227     }
228
229     @Test
230     public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
231         setUpConfiguration();
232
233         final int noOfEvents = 1;
234         final int noOfFilesPerEvent = 1;
235
236         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
237         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
238
239         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
240
241         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
242         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
243         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
244
245         testedObject.executeDatafileMainTask();
246
247         await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
248             testedObject.getCurrentNumberOfSubscriptions()));
249
250         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
251
252         verify(appConfig).getDmaapConsumerConfiguration();
253         verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
254         verifyNoMoreInteractions(appConfig);
255
256         assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
257     }
258
259     @Test
260     public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
261         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
262             .publishUrl(publishUrl) //
263             .logUrl("") //
264             .userName("userName") //
265             .passWord("passWord") //
266             .trustStorePath("trustStorePath") //
267             .trustStorePasswordPath("trustStorePasswordPath") //
268             .keyStorePath("keyStorePath") //
269             .keyStorePasswordPath("keyStorePasswordPath") //
270             .enableDmaapCertAuth(true) //
271             .changeIdentifier("Different changeIdentifier") //
272             .build(); //
273         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
274             .topicUrl("topicUrl").trustStorePath("trustStorePath") //
275             .trustStorePasswordPath("trustStorePasswordPath") //
276             .keyStorePath("keyStorePath") //
277             .keyStorePasswordPath("keyStorePasswordPath") //
278             .enableDmaapCertAuth(true) //
279             .build();
280
281         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
282         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
283         doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
284         final int noOfEvents = 1;
285         final int noOfFilesPerEvent = 1;
286
287         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
288         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
289
290         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
291         testedObject.executeDatafileMainTask();
292
293         await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
294             testedObject.getCurrentNumberOfSubscriptions()));
295
296         assertTrue("Error missing in log", logAppender.list.toString().contains(
297             "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
298     }
299
300     @Test
301     public void createMainTask_consumeFail() {
302         MDC.setContextMap(contextMap);
303         doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
304
305         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
306         StepVerifier //
307             .create(testedObject.createMainTask(contextMap)) //
308             .expectSubscription() //
309             .expectNextCount(0) //
310             .expectComplete() //
311             .verify(); //
312
313         assertTrue("Error missing in log", logAppender.list.toString()
314             .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"));
315     }
316
317     @Test
318     public void consume_successfulCase() throws DatafileTaskException {
319         setUpConfiguration();
320
321         final int noOfEvents = 200;
322         final int noOfFilesPerEvent = 200;
323         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
324
325         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
326         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
327
328         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
329
330         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
331         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
332         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
333
334         StepVerifier //
335             .create(testedObject.createMainTask(contextMap)) //
336             .expectSubscription() //
337             .expectNextCount(noOfFiles) //
338             .expectComplete() //
339             .verify(); //
340
341         assertEquals(0, testedObject.getCurrentNumberOfTasks());
342         assertEquals(0, testedObject.getThreadPoolQueueSize());
343
344         verify(consumerMock, times(1)).getMessageRouterResponse();
345         verifyNoMoreInteractions(consumerMock);
346
347         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
348         verifyNoMoreInteractions(fileCollectorMock);
349
350         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
351         verifyNoMoreInteractions(dataRouterMock);
352
353         assertEquals("totalReceivedEvents should have been 200", 200,
354             testedObject.getCounters().getTotalReceivedEvents());
355     }
356
357     @Test
358     public void consume_fetchFailedOnce() throws DatafileTaskException {
359         setUpConfiguration();
360
361         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
362         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
363
364         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
365
366         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
367         Mono<Object> error = Mono.error(new Exception("problem"));
368
369         // First file collect will fail, 3 will succeed
370         doReturn(error, collectedFile, collectedFile, collectedFile) //
371             .when(fileCollectorMock) //
372             .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
373
374         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
375         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
376
377         StepVerifier //
378             .create(testedObject.createMainTask(contextMap)) //
379             .expectSubscription() //
380             .expectNextCount(3) //
381             .expectComplete() //
382             .verify(); //
383
384         assertEquals(0, testedObject.getCurrentNumberOfTasks());
385
386         verify(consumerMock, times(1)).getMessageRouterResponse();
387         verifyNoMoreInteractions(consumerMock);
388
389         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
390         verifyNoMoreInteractions(fileCollectorMock);
391
392         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
393         verifyNoMoreInteractions(dataRouterMock);
394
395         assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
396         assertEquals("failedFtp should have been 1", 1, testedObject.getCounters().getNoOfFailedFtp());
397     }
398
399     @Test
400     public void consume_publishFailedOnce() throws DatafileTaskException {
401         setUpConfiguration();
402
403         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
404         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
405
406         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
407
408         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
409         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
410
411         Mono<Object> error = Mono.error(new Exception("problem"));
412         // One publish will fail, the rest will succeed
413         doReturn(collectedFile, error, collectedFile, collectedFile) //
414             .when(dataRouterMock) //
415             .publishFile(notNull(), anyLong(), notNull());
416
417         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
418         StepVerifier //
419             .create(testedObject.createMainTask(contextMap)) //
420             .expectSubscription() //
421             .expectNextCount(3) // 3 completed files
422             .expectComplete() //
423             .verify(); //
424
425         assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
426
427         assertEquals(0, testedObject.getCurrentNumberOfTasks());
428
429         verify(consumerMock, times(1)).getMessageRouterResponse();
430         verifyNoMoreInteractions(consumerMock);
431
432         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
433         verifyNoMoreInteractions(fileCollectorMock);
434
435         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
436         verifyNoMoreInteractions(dataRouterMock);
437
438         assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
439         assertEquals("noOfFailedPublish should have been 1", 1, testedObject.getCounters().getNoOfFailedPublish());
440     }
441
442     @Test
443     public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
444         setUpConfiguration();
445
446         final int noOfEvents = 1;
447         final int noOfFilesPerEvent = 100;
448
449         // 100 files with the same name
450         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
451         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
452
453         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
454
455         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
456         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
457         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
458
459         StepVerifier //
460             .create(testedObject.createMainTask(contextMap)).expectSubscription() //
461             .expectNextCount(1) // 99 is skipped
462             .expectComplete() //
463             .verify(); //
464
465         assertEquals(0, testedObject.getCurrentNumberOfTasks());
466
467         verify(consumerMock, times(1)).getMessageRouterResponse();
468         verifyNoMoreInteractions(consumerMock);
469
470         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
471         verifyNoMoreInteractions(fileCollectorMock);
472
473         verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
474         verifyNoMoreInteractions(dataRouterMock);
475
476         verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
477         verifyNoMoreInteractions(publishedCheckerMock);
478
479         assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
480     }
481 }