1cb79bcf70f289030c5244a8a9290b000515bc34
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Nordix Foundation.
4  * Copyright (C) 2020 Nokia. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.dcaegen2.collectors.datafile.tasks;
23
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.anyString;
31 import static org.mockito.ArgumentMatchers.notNull;
32 import static org.mockito.Mockito.doReturn;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.spy;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.verifyNoMoreInteractions;
38
39 import ch.qos.logback.classic.spi.ILoggingEvent;
40 import ch.qos.logback.core.read.ListAppender;
41
42 import java.nio.file.Paths;
43 import java.time.Duration;
44 import java.time.Instant;
45 import java.util.HashMap;
46 import java.util.LinkedList;
47 import java.util.List;
48 import java.util.Map;
49
50 import org.apache.commons.lang3.StringUtils;
51 import org.junit.jupiter.api.BeforeEach;
52 import org.junit.jupiter.api.Test;
53 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
54 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
55 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
56 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
57 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
58 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
59 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
60 import org.onap.dcaegen2.collectors.datafile.model.FileData;
61 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
62 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
63 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
64 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
65 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
66 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
67 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
68 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
69 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
70 import org.slf4j.MDC;
71
72 import reactor.core.publisher.Flux;
73 import reactor.core.publisher.Mono;
74 import reactor.test.StepVerifier;
75
76 public class ScheduledTasksTest {
77
78     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
79     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
80
81     private AppConfig appConfig = mock(AppConfig.class);
82     private ScheduledTasks testedObject;
83
84     private int uniqueValue = 0;
85     private DMaaPMessageConsumer consumerMock;
86     private PublishedChecker publishedCheckerMock;
87     private FileCollector fileCollectorMock;
88     private DataRouterPublisher dataRouterMock;
89     private Map<String, String> contextMap = new HashMap<String, String>();
90
91     private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
92
93     @BeforeEach
94     private void setUp() throws DatafileTaskException {
95         testedObject = spy(new ScheduledTasks(appConfig));
96
97         consumerMock = mock(DMaaPMessageConsumer.class);
98         publishedCheckerMock = mock(PublishedChecker.class);
99         fileCollectorMock = mock(FileCollector.class);
100         dataRouterMock = mock(DataRouterPublisher.class);
101
102         doReturn(consumerMock).when(testedObject).createConsumerTask();
103         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
104         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
105         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
106     }
107
108     private void setUpConfiguration() throws DatafileTaskException {
109         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
110             .publishUrl(publishUrl) //
111             .logUrl("") //
112             .userName("userName") //
113             .passWord("passWord") //
114             .trustStorePath("trustStorePath") //
115             .trustStorePasswordPath("trustStorePasswordPath") //
116             .keyStorePath("keyStorePath") //
117             .keyStorePasswordPath("keyStorePasswordPath") //
118             .enableDmaapCertAuth(true) //
119             .changeIdentifier(CHANGE_IDENTIFIER) //
120             .build(); //
121         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
122             .topicUrl("topicUrl").trustStorePath("trustStorePath") //
123             .trustStorePasswordPath("trustStorePasswordPath") //
124             .keyStorePath("keyStorePath") //
125             .keyStorePasswordPath("keyStorePasswordPath") //
126             .enableDmaapCertAuth(true) //
127             .build();
128
129         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
130         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
131         doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
132     }
133
134     private MessageMetaData messageMetaData() {
135         return ImmutableMessageMetaData.builder() //
136             .productName("productName") //
137             .vendorName("") //
138             .lastEpochMicrosec("") //
139             .sourceName("") //
140             .startEpochMicrosec("") //
141             .timeZoneOffset("") //
142             .changeIdentifier(CHANGE_IDENTIFIER) //
143             .changeType("") //
144             .build();
145     }
146
147     private FileData fileData(int instanceNumber) {
148         return ImmutableFileData.builder() //
149             .name(PM_FILE_NAME + instanceNumber) //
150             .fileFormatType("") //
151             .fileFormatVersion("") //
152             .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
153             .scheme(Scheme.FTPES) //
154             .compression("") //
155             .messageMetaData(messageMetaData()) //
156             .build();
157     }
158
159     private List<FileData> files(int size, boolean uniqueNames) {
160         List<FileData> list = new LinkedList<FileData>();
161         for (int i = 0; i < size; ++i) {
162             if (uniqueNames) {
163                 ++uniqueValue;
164             }
165             list.add(fileData(uniqueValue));
166         }
167         return list;
168     }
169
170     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
171         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
172     }
173
174     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
175         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
176         for (int i = 0; i < numberOfEvents; ++i) {
177             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
178         }
179         return Flux.fromIterable(list);
180     }
181
182     private FilePublishInformation filePublishInformation() {
183         return ImmutableFilePublishInformation //
184             .builder() //
185             .productName("") //
186             .vendorName("") //
187             .lastEpochMicrosec("") //
188             .sourceName("") //
189             .startEpochMicrosec("") //
190             .timeZoneOffset("") //
191             .name("") //
192             .location("") //
193             .internalLocation(Paths.get("internalLocation")) //
194             .compression("") //
195             .fileFormatType("") //
196             .fileFormatVersion("") //
197             .changeIdentifier(CHANGE_IDENTIFIER) //
198             .context(new HashMap<String, String>()).build();
199     }
200
201     @Test
202     public void purgeFileCache() {
203         testedObject.publishedFilesCache.put(Paths.get("file.xml"));
204
205         testedObject.purgeCachedInformation(Instant.MAX);
206
207         assertEquals(0, testedObject.publishedFilesCacheSize());
208     }
209
210     @Test
211     public void nothingToConsume() throws DatafileTaskException {
212         setUpConfiguration();
213
214         doReturn(consumerMock).when(testedObject).createConsumerTask();
215         doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
216
217         testedObject.executeDatafileMainTask();
218
219         assertEquals(0, testedObject.getCurrentNumberOfTasks());
220         verify(consumerMock, times(1)).getMessageRouterResponse();
221         verifyNoMoreInteractions(consumerMock);
222     }
223
224     @Test
225     public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
226         doReturn(51).when(testedObject).getCurrentNumberOfTasks();
227
228         testedObject.executeDatafileMainTask();
229
230         verifyNoMoreInteractions(consumerMock);
231     }
232
233     @Test
234     public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
235         setUpConfiguration();
236
237         final int noOfEvents = 1;
238         final int noOfFilesPerEvent = 1;
239
240         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
241         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
242
243         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
244
245         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
246         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
247         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
248
249         testedObject.executeDatafileMainTask();
250
251         await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
252             testedObject.getCurrentNumberOfSubscriptions()));
253
254         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
255
256         verify(appConfig).getDmaapConsumerConfiguration();
257         verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
258         verifyNoMoreInteractions(appConfig);
259
260         assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
261     }
262
263     @Test
264     public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
265         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
266             .publishUrl(publishUrl) //
267             .logUrl("") //
268             .userName("userName") //
269             .passWord("passWord") //
270             .trustStorePath("trustStorePath") //
271             .trustStorePasswordPath("trustStorePasswordPath") //
272             .keyStorePath("keyStorePath") //
273             .keyStorePasswordPath("keyStorePasswordPath") //
274             .enableDmaapCertAuth(true) //
275             .changeIdentifier("Different changeIdentifier") //
276             .build(); //
277         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
278             .topicUrl("topicUrl").trustStorePath("trustStorePath") //
279             .trustStorePasswordPath("trustStorePasswordPath") //
280             .keyStorePath("keyStorePath") //
281             .keyStorePasswordPath("keyStorePasswordPath") //
282             .enableDmaapCertAuth(true) //
283             .build();
284
285         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
286         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
287         doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
288         final int noOfEvents = 1;
289         final int noOfFilesPerEvent = 1;
290
291         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
292         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
293
294         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
295         testedObject.executeDatafileMainTask();
296
297         await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
298             testedObject.getCurrentNumberOfSubscriptions()));
299
300         assertTrue("Error missing in log", logAppender.list.toString().contains(
301             "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
302     }
303
304     @Test
305     public void createMainTask_consumeFail() {
306         MDC.setContextMap(contextMap);
307         doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
308
309         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
310         StepVerifier //
311             .create(testedObject.createMainTask(contextMap)) //
312             .expectSubscription() //
313             .expectNextCount(0) //
314             .expectComplete() //
315             .verify(); //
316
317         assertTrue("Error missing in log", logAppender.list.toString()
318             .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"));
319     }
320
321     @Test
322     public void consume_successfulCase() throws DatafileTaskException {
323         setUpConfiguration();
324
325         final int noOfEvents = 200;
326         final int noOfFilesPerEvent = 200;
327         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
328
329         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
330         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
331
332         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
333
334         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
335         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
336         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
337
338         StepVerifier //
339             .create(testedObject.createMainTask(contextMap)) //
340             .expectSubscription() //
341             .expectNextCount(noOfFiles) //
342             .expectComplete() //
343             .verify(); //
344
345         assertEquals(0, testedObject.getCurrentNumberOfTasks());
346         assertEquals(0, testedObject.getThreadPoolQueueSize());
347
348         verify(consumerMock, times(1)).getMessageRouterResponse();
349         verifyNoMoreInteractions(consumerMock);
350
351         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
352         verifyNoMoreInteractions(fileCollectorMock);
353
354         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
355         verifyNoMoreInteractions(dataRouterMock);
356
357         assertEquals("totalReceivedEvents should have been 200", 200,
358             testedObject.getCounters().getTotalReceivedEvents());
359     }
360
361     @Test
362     public void consume_fetchFailedOnce() throws DatafileTaskException {
363         setUpConfiguration();
364
365         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
366         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
367
368         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
369
370         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
371         Mono<Object> error = Mono.error(new Exception("problem"));
372
373         // First file collect will fail, 3 will succeed
374         doReturn(error, collectedFile, collectedFile, collectedFile) //
375             .when(fileCollectorMock) //
376             .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
377
378         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
379         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
380
381         StepVerifier //
382             .create(testedObject.createMainTask(contextMap)) //
383             .expectSubscription() //
384             .expectNextCount(3) //
385             .expectComplete() //
386             .verify(); //
387
388         assertEquals(0, testedObject.getCurrentNumberOfTasks());
389
390         verify(consumerMock, times(1)).getMessageRouterResponse();
391         verifyNoMoreInteractions(consumerMock);
392
393         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
394         verifyNoMoreInteractions(fileCollectorMock);
395
396         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
397         verifyNoMoreInteractions(dataRouterMock);
398
399         assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
400         assertEquals("failedFtp should have been 1", 1, testedObject.getCounters().getNoOfFailedFtp());
401     }
402
403     @Test
404     public void consume_publishFailedOnce() throws DatafileTaskException {
405         setUpConfiguration();
406
407         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
408         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
409
410         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
411
412         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
413         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
414
415         Mono<Object> error = Mono.error(new Exception("problem"));
416         // One publish will fail, the rest will succeed
417         doReturn(collectedFile, error, collectedFile, collectedFile) //
418             .when(dataRouterMock) //
419             .publishFile(notNull(), anyLong(), notNull());
420
421         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
422         StepVerifier //
423             .create(testedObject.createMainTask(contextMap)) //
424             .expectSubscription() //
425             .expectNextCount(3) // 3 completed files
426             .expectComplete() //
427             .verify(); //
428
429         assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
430
431         assertEquals(0, testedObject.getCurrentNumberOfTasks());
432
433         verify(consumerMock, times(1)).getMessageRouterResponse();
434         verifyNoMoreInteractions(consumerMock);
435
436         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
437         verifyNoMoreInteractions(fileCollectorMock);
438
439         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
440         verifyNoMoreInteractions(dataRouterMock);
441
442         assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
443         assertEquals("noOfFailedPublish should have been 1", 1, testedObject.getCounters().getNoOfFailedPublish());
444     }
445
446     @Test
447     public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
448         setUpConfiguration();
449
450         final int noOfEvents = 1;
451         final int noOfFilesPerEvent = 100;
452
453         // 100 files with the same name
454         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
455         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
456
457         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
458
459         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
460         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
461         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
462
463         StepVerifier //
464             .create(testedObject.createMainTask(contextMap)).expectSubscription() //
465             .expectNextCount(1) // 99 is skipped
466             .expectComplete() //
467             .verify(); //
468
469         assertEquals(0, testedObject.getCurrentNumberOfTasks());
470
471         verify(consumerMock, times(1)).getMessageRouterResponse();
472         verifyNoMoreInteractions(consumerMock);
473
474         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
475         verifyNoMoreInteractions(fileCollectorMock);
476
477         verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
478         verifyNoMoreInteractions(dataRouterMock);
479
480         verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
481         verifyNoMoreInteractions(publishedCheckerMock);
482
483         assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
484     }
485 }