a0096b77afa928085ccf8c73b8753c61ae8cba8f
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile.tasks;
22
23 import static org.awaitility.Awaitility.await;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyLong;
29 import static org.mockito.ArgumentMatchers.anyString;
30 import static org.mockito.ArgumentMatchers.notNull;
31 import static org.mockito.Mockito.doReturn;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.verifyNoMoreInteractions;
37
38 import ch.qos.logback.classic.spi.ILoggingEvent;
39 import ch.qos.logback.core.read.ListAppender;
40
41 import java.nio.file.Paths;
42 import java.time.Duration;
43 import java.time.Instant;
44 import java.util.HashMap;
45 import java.util.LinkedList;
46 import java.util.List;
47 import java.util.Map;
48
49 import org.apache.commons.lang3.StringUtils;
50 import org.junit.jupiter.api.BeforeEach;
51 import org.junit.jupiter.api.Test;
52 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
53 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
54 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
55 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
56 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
57 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
58 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
59 import org.onap.dcaegen2.collectors.datafile.model.FileData;
60 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
61 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
62 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
63 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
64 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
65 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
66 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
67 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
68 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
69 import org.slf4j.MDC;
70
71 import reactor.core.publisher.Flux;
72 import reactor.core.publisher.Mono;
73 import reactor.test.StepVerifier;
74
75 public class ScheduledTasksTest {
76
77     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
78     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
79
80     private AppConfig appConfig = mock(AppConfig.class);
81     private ScheduledTasks testedObject;
82
83     private int uniqueValue = 0;
84     private DMaaPMessageConsumer consumerMock;
85     private PublishedChecker publishedCheckerMock;
86     private FileCollector fileCollectorMock;
87     private DataRouterPublisher dataRouterMock;
88     private Map<String, String> contextMap = new HashMap<String, String>();
89
90     private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
91
92     @BeforeEach
93     private void setUp() throws DatafileTaskException {
94         testedObject = spy(new ScheduledTasks(appConfig));
95
96         consumerMock = mock(DMaaPMessageConsumer.class);
97         publishedCheckerMock = mock(PublishedChecker.class);
98         fileCollectorMock = mock(FileCollector.class);
99         dataRouterMock = mock(DataRouterPublisher.class);
100
101         doReturn(consumerMock).when(testedObject).createConsumerTask();
102         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
103         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
104         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
105     }
106
107     private void setUpConfiguration() throws DatafileTaskException {
108         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
109             .publishUrl(publishUrl) //
110             .logUrl("") //
111             .userName("userName") //
112             .passWord("passWord") //
113             .trustStorePath("trustStorePath") //
114             .trustStorePasswordPath("trustStorePasswordPath") //
115             .keyStorePath("keyStorePath") //
116             .keyStorePasswordPath("keyStorePasswordPath") //
117             .enableDmaapCertAuth(true) //
118             .changeIdentifier(CHANGE_IDENTIFIER) //
119             .build(); //
120         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
121             .topicUrl("topicUrl").trustStorePath("trustStorePath") //
122             .trustStorePasswordPath("trustStorePasswordPath") //
123             .keyStorePath("keyStorePath") //
124             .keyStorePasswordPath("keyStorePasswordPath") //
125             .enableDmaapCertAuth(true) //
126             .build();
127
128         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
129         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
130         doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
131     }
132
133     private MessageMetaData messageMetaData() {
134         return ImmutableMessageMetaData.builder() //
135             .productName("productName") //
136             .vendorName("") //
137             .lastEpochMicrosec("") //
138             .sourceName("") //
139             .startEpochMicrosec("") //
140             .timeZoneOffset("") //
141             .changeIdentifier(CHANGE_IDENTIFIER) //
142             .changeType("") //
143             .build();
144     }
145
146     private FileData fileData(int instanceNumber) {
147         return ImmutableFileData.builder() //
148             .name(PM_FILE_NAME + instanceNumber) //
149             .fileFormatType("") //
150             .fileFormatVersion("") //
151             .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
152             .scheme(Scheme.FTPS) //
153             .compression("") //
154             .messageMetaData(messageMetaData()) //
155             .build();
156     }
157
158     private List<FileData> files(int size, boolean uniqueNames) {
159         List<FileData> list = new LinkedList<FileData>();
160         for (int i = 0; i < size; ++i) {
161             if (uniqueNames) {
162                 ++uniqueValue;
163             }
164             list.add(fileData(uniqueValue));
165         }
166         return list;
167     }
168
169     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
170         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
171     }
172
173     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
174         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
175         for (int i = 0; i < numberOfEvents; ++i) {
176             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
177         }
178         return Flux.fromIterable(list);
179     }
180
181     private FilePublishInformation filePublishInformation() {
182         return ImmutableFilePublishInformation //
183             .builder() //
184             .productName("") //
185             .vendorName("") //
186             .lastEpochMicrosec("") //
187             .sourceName("") //
188             .startEpochMicrosec("") //
189             .timeZoneOffset("") //
190             .name("") //
191             .location("") //
192             .internalLocation(Paths.get("internalLocation")) //
193             .compression("") //
194             .fileFormatType("") //
195             .fileFormatVersion("") //
196             .changeIdentifier(CHANGE_IDENTIFIER) //
197             .context(new HashMap<String, String>()).build();
198     }
199
200     @Test
201     public void purgeFileCache() {
202         testedObject.publishedFilesCache.put(Paths.get("file.xml"));
203
204         testedObject.purgeCachedInformation(Instant.MAX);
205
206         assertEquals(0, testedObject.publishedFilesCacheSize());
207     }
208
209     @Test
210     public void nothingToConsume() throws DatafileTaskException {
211         setUpConfiguration();
212
213         doReturn(consumerMock).when(testedObject).createConsumerTask();
214         doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
215
216         testedObject.executeDatafileMainTask();
217
218         assertEquals(0, testedObject.getCurrentNumberOfTasks());
219         verify(consumerMock, times(1)).getMessageRouterResponse();
220         verifyNoMoreInteractions(consumerMock);
221     }
222
223     @Test
224     public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
225         doReturn(51).when(testedObject).getCurrentNumberOfTasks();
226
227         testedObject.executeDatafileMainTask();
228
229         verifyNoMoreInteractions(consumerMock);
230     }
231
232     @Test
233     public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
234         setUpConfiguration();
235
236         final int noOfEvents = 1;
237         final int noOfFilesPerEvent = 1;
238
239         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
240         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
241
242         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
243
244         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
245         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
246         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
247
248         testedObject.executeDatafileMainTask();
249
250         await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
251             testedObject.getCurrentNumberOfSubscriptions()));
252
253         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
254
255         verify(appConfig).getDmaapConsumerConfiguration();
256         verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
257         verifyNoMoreInteractions(appConfig);
258
259         assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
260     }
261
262     @Test
263     public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
264         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
265             .publishUrl(publishUrl) //
266             .logUrl("") //
267             .userName("userName") //
268             .passWord("passWord") //
269             .trustStorePath("trustStorePath") //
270             .trustStorePasswordPath("trustStorePasswordPath") //
271             .keyStorePath("keyStorePath") //
272             .keyStorePasswordPath("keyStorePasswordPath") //
273             .enableDmaapCertAuth(true) //
274             .changeIdentifier("Different changeIdentifier") //
275             .build(); //
276         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
277             .topicUrl("topicUrl").trustStorePath("trustStorePath") //
278             .trustStorePasswordPath("trustStorePasswordPath") //
279             .keyStorePath("keyStorePath") //
280             .keyStorePasswordPath("keyStorePasswordPath") //
281             .enableDmaapCertAuth(true) //
282             .build();
283
284         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
285         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
286         doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
287         final int noOfEvents = 1;
288         final int noOfFilesPerEvent = 1;
289
290         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
291         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
292
293         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
294         testedObject.executeDatafileMainTask();
295
296         await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
297             testedObject.getCurrentNumberOfSubscriptions()));
298
299         assertTrue("Error missing in log", logAppender.list.toString().contains(
300             "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
301     }
302
303     @Test
304     public void createMainTask_consumeFail() {
305         MDC.setContextMap(contextMap);
306         doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
307
308         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
309         StepVerifier //
310             .create(testedObject.createMainTask(contextMap)) //
311             .expectSubscription() //
312             .expectNextCount(0) //
313             .expectComplete() //
314             .verify(); //
315
316         assertTrue("Error missing in log", logAppender.list.toString()
317             .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"));
318     }
319
320     @Test
321     public void consume_successfulCase() throws DatafileTaskException {
322         setUpConfiguration();
323
324         final int noOfEvents = 200;
325         final int noOfFilesPerEvent = 200;
326         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
327
328         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
329         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
330
331         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
332
333         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
334         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
335         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
336
337         StepVerifier //
338             .create(testedObject.createMainTask(contextMap)) //
339             .expectSubscription() //
340             .expectNextCount(noOfFiles) //
341             .expectComplete() //
342             .verify(); //
343
344         assertEquals(0, testedObject.getCurrentNumberOfTasks());
345         assertEquals(0, testedObject.getThreadPoolQueueSize());
346
347         verify(consumerMock, times(1)).getMessageRouterResponse();
348         verifyNoMoreInteractions(consumerMock);
349
350         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
351         verifyNoMoreInteractions(fileCollectorMock);
352
353         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
354         verifyNoMoreInteractions(dataRouterMock);
355
356         assertEquals("totalReceivedEvents should have been 200", 200,
357             testedObject.getCounters().getTotalReceivedEvents());
358     }
359
360     @Test
361     public void consume_fetchFailedOnce() throws DatafileTaskException {
362         setUpConfiguration();
363
364         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
365         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
366
367         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
368
369         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
370         Mono<Object> error = Mono.error(new Exception("problem"));
371
372         // First file collect will fail, 3 will succeed
373         doReturn(error, collectedFile, collectedFile, collectedFile) //
374             .when(fileCollectorMock) //
375             .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
376
377         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
378         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
379
380         StepVerifier //
381             .create(testedObject.createMainTask(contextMap)) //
382             .expectSubscription() //
383             .expectNextCount(3) //
384             .expectComplete() //
385             .verify(); //
386
387         assertEquals(0, testedObject.getCurrentNumberOfTasks());
388
389         verify(consumerMock, times(1)).getMessageRouterResponse();
390         verifyNoMoreInteractions(consumerMock);
391
392         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
393         verifyNoMoreInteractions(fileCollectorMock);
394
395         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
396         verifyNoMoreInteractions(dataRouterMock);
397
398         assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
399         assertEquals("failedFtp should have been 1", 1, testedObject.getCounters().getNoOfFailedFtp());
400     }
401
402     @Test
403     public void consume_publishFailedOnce() throws DatafileTaskException {
404         setUpConfiguration();
405
406         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
407         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
408
409         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
410
411         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
412         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
413
414         Mono<Object> error = Mono.error(new Exception("problem"));
415         // One publish will fail, the rest will succeed
416         doReturn(collectedFile, error, collectedFile, collectedFile) //
417             .when(dataRouterMock) //
418             .publishFile(notNull(), anyLong(), notNull());
419
420         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
421         StepVerifier //
422             .create(testedObject.createMainTask(contextMap)) //
423             .expectSubscription() //
424             .expectNextCount(3) // 3 completed files
425             .expectComplete() //
426             .verify(); //
427
428         assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
429
430         assertEquals(0, testedObject.getCurrentNumberOfTasks());
431
432         verify(consumerMock, times(1)).getMessageRouterResponse();
433         verifyNoMoreInteractions(consumerMock);
434
435         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
436         verifyNoMoreInteractions(fileCollectorMock);
437
438         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
439         verifyNoMoreInteractions(dataRouterMock);
440
441         assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
442         assertEquals("noOfFailedPublish should have been 1", 1, testedObject.getCounters().getNoOfFailedPublish());
443     }
444
445     @Test
446     public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
447         setUpConfiguration();
448
449         final int noOfEvents = 1;
450         final int noOfFilesPerEvent = 100;
451
452         // 100 files with the same name
453         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
454         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
455
456         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
457
458         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
459         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
460         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
461
462         StepVerifier //
463             .create(testedObject.createMainTask(contextMap)).expectSubscription() //
464             .expectNextCount(1) // 99 is skipped
465             .expectComplete() //
466             .verify(); //
467
468         assertEquals(0, testedObject.getCurrentNumberOfTasks());
469
470         verify(consumerMock, times(1)).getMessageRouterResponse();
471         verifyNoMoreInteractions(consumerMock);
472
473         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
474         verifyNoMoreInteractions(fileCollectorMock);
475
476         verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
477         verifyNoMoreInteractions(dataRouterMock);
478
479         verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
480         verifyNoMoreInteractions(publishedCheckerMock);
481
482         assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
483     }
484 }