3df2edae061655be6e3d620a431afa8091f0a59a
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile.tasks;
22
23 import static org.awaitility.Awaitility.await;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyLong;
29 import static org.mockito.ArgumentMatchers.anyString;
30 import static org.mockito.ArgumentMatchers.notNull;
31 import static org.mockito.Mockito.doReturn;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.verifyNoMoreInteractions;
37
38 import ch.qos.logback.classic.spi.ILoggingEvent;
39 import ch.qos.logback.core.read.ListAppender;
40 import java.nio.file.Paths;
41 import java.time.Duration;
42 import java.time.Instant;
43 import java.util.HashMap;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.Map;
47 import org.apache.commons.lang3.StringUtils;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Test;
50 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
51 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
52 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
53 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
54 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
55 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
56 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
57 import org.onap.dcaegen2.collectors.datafile.model.FileData;
58 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
59 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
60 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
61 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
62 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
63 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
64 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
65 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
66 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
67 import org.slf4j.MDC;
68 import reactor.core.publisher.Flux;
69 import reactor.core.publisher.Mono;
70 import reactor.test.StepVerifier;
71
72 public class ScheduledTasksTest {
73
74     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
75     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
76
77     private AppConfig appConfig = mock(AppConfig.class);
78     private ScheduledTasks testedObject;
79
80     private int uniqueValue = 0;
81     private DMaaPMessageConsumer consumerMock;
82     private PublishedChecker publishedCheckerMock;
83     private FileCollector fileCollectorMock;
84     private DataRouterPublisher dataRouterMock;
85     private Map<String, String> contextMap = new HashMap<String, String>();
86
87     private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
88
89     @BeforeEach
90     private void setUp() throws DatafileTaskException {
91         testedObject = spy(new ScheduledTasks(appConfig));
92
93         consumerMock = mock(DMaaPMessageConsumer.class);
94         publishedCheckerMock = mock(PublishedChecker.class);
95         fileCollectorMock = mock(FileCollector.class);
96         dataRouterMock = mock(DataRouterPublisher.class);
97
98         doReturn(consumerMock).when(testedObject).createConsumerTask();
99         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
100         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
101         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
102     }
103
104     private void setUpConfiguration() throws DatafileTaskException {
105         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
106                 .publishUrl(publishUrl) //
107                 .logUrl("") //
108                 .userName("userName") //
109                 .passWord("passWord") //
110                 .trustStorePath("trustStorePath") //
111                 .trustStorePasswordPath("trustStorePasswordPath") //
112                 .keyStorePath("keyStorePath") //
113                 .keyStorePasswordPath("keyStorePasswordPath") //
114                 .enableDmaapCertAuth(true) //
115                 .changeIdentifier(CHANGE_IDENTIFIER) //
116                 .build(); //
117         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
118                 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
119                 .trustStorePasswordPath("trustStorePasswordPath") //
120                 .keyStorePath("keyStorePath") //
121                 .keyStorePasswordPath("keyStorePasswordPath") //
122                 .enableDmaapCertAuth(true) //
123                 .build();
124
125         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
126         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
127         doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
128     }
129
130     private MessageMetaData messageMetaData() {
131         return ImmutableMessageMetaData.builder() //
132                 .productName("productName") //
133                 .vendorName("") //
134                 .lastEpochMicrosec("") //
135                 .sourceName("") //
136                 .startEpochMicrosec("") //
137                 .timeZoneOffset("") //
138                 .changeIdentifier(CHANGE_IDENTIFIER) //
139                 .changeType("") //
140                 .build();
141     }
142
143     private FileData fileData(int instanceNumber) {
144         return ImmutableFileData.builder() //
145                 .name(PM_FILE_NAME + instanceNumber) //
146                 .fileFormatType("") //
147                 .fileFormatVersion("") //
148                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
149                 .scheme(Scheme.FTPS) //
150                 .compression("") //
151                 .messageMetaData(messageMetaData()) //
152                 .build();
153     }
154
155     private List<FileData> files(int size, boolean uniqueNames) {
156         List<FileData> list = new LinkedList<FileData>();
157         for (int i = 0; i < size; ++i) {
158             if (uniqueNames) {
159                 ++uniqueValue;
160             }
161             list.add(fileData(uniqueValue));
162         }
163         return list;
164     }
165
166     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
167         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
168     }
169
170     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
171         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
172         for (int i = 0; i < numberOfEvents; ++i) {
173             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
174         }
175         return Flux.fromIterable(list);
176     }
177
178     private FilePublishInformation filePublishInformation() {
179         return ImmutableFilePublishInformation //
180                 .builder() //
181                 .productName("") //
182                 .vendorName("") //
183                 .lastEpochMicrosec("") //
184                 .sourceName("") //
185                 .startEpochMicrosec("") //
186                 .timeZoneOffset("") //
187                 .name("") //
188                 .location("") //
189                 .internalLocation(Paths.get("internalLocation")) //
190                 .compression("") //
191                 .fileFormatType("") //
192                 .fileFormatVersion("") //
193                 .changeIdentifier(CHANGE_IDENTIFIER) //
194                 .context(new HashMap<String, String>()).build();
195     }
196
197     @Test
198     public void purgeFileCache() {
199         testedObject.publishedFilesCache.put(Paths.get("file.xml"));
200
201         testedObject.purgeCachedInformation(Instant.MAX);
202
203         assertEquals(0, testedObject.publishedFilesCacheSize());
204     }
205
206     @Test
207     public void nothingToConsume() throws DatafileTaskException {
208         setUpConfiguration();
209
210         doReturn(consumerMock).when(testedObject).createConsumerTask();
211         doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
212
213         testedObject.executeDatafileMainTask();
214
215         assertEquals(0, testedObject.getCurrentNumberOfTasks());
216         verify(consumerMock, times(1)).getMessageRouterResponse();
217         verifyNoMoreInteractions(consumerMock);
218     }
219
220     @Test
221     public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
222         doReturn(51).when(testedObject).getCurrentNumberOfTasks();
223
224         testedObject.executeDatafileMainTask();
225
226         verifyNoMoreInteractions(consumerMock);
227     }
228
229     @Test
230     public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
231         setUpConfiguration();
232
233         final int noOfEvents = 1;
234         final int noOfFilesPerEvent = 1;
235
236         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
237         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
238
239         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
240
241         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
242         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
243         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
244
245         testedObject.executeDatafileMainTask();
246
247         await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions()));
248
249         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
250
251         verify(appConfig).getDmaapConsumerConfiguration();
252         verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
253         verifyNoMoreInteractions(appConfig);
254     }
255
256     @Test
257     public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
258         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
259                 .publishUrl(publishUrl) //
260                 .logUrl("") //
261                 .userName("userName") //
262                 .passWord("passWord") //
263                 .trustStorePath("trustStorePath") //
264                 .trustStorePasswordPath("trustStorePasswordPath") //
265                 .keyStorePath("keyStorePath") //
266                 .keyStorePasswordPath("keyStorePasswordPath") //
267                 .enableDmaapCertAuth(true) //
268                 .changeIdentifier("Different changeIdentifier") //
269                 .build(); //
270         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
271                 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
272                 .trustStorePasswordPath("trustStorePasswordPath") //
273                 .keyStorePath("keyStorePath") //
274                 .keyStorePasswordPath("keyStorePasswordPath") //
275                 .enableDmaapCertAuth(true) //
276                 .build();
277
278         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
279         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
280         doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
281         final int noOfEvents = 1;
282         final int noOfFilesPerEvent = 1;
283
284         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
285         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
286
287         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
288         testedObject.executeDatafileMainTask();
289
290         await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions()));
291
292         assertTrue("Error missing in log", logAppender.list.toString().contains(
293                 "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
294     }
295
296     @Test
297     public void createMainTask_consumeFail() {
298         MDC.setContextMap(contextMap);
299         doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
300
301         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
302         StepVerifier //
303                 .create(testedObject.createMainTask(contextMap)) //
304                 .expectSubscription() //
305                 .expectNextCount(0) //
306                 .expectComplete() //
307                 .verify(); //
308
309         assertTrue("Error missing in log", logAppender.list.toString().contains(
310                 "[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"));
311     }
312
313     @Test
314     public void consume_successfulCase() throws DatafileTaskException {
315         setUpConfiguration();
316
317         final int noOfEvents = 200;
318         final int noOfFilesPerEvent = 200;
319         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
320
321         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
322         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
323
324         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
325
326         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
327         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
328         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
329
330         StepVerifier //
331                 .create(testedObject.createMainTask(contextMap)) //
332                 .expectSubscription() //
333                 .expectNextCount(noOfFiles) //
334                 .expectComplete() //
335                 .verify(); //
336
337         assertEquals(0, testedObject.getCurrentNumberOfTasks());
338         assertEquals(0, testedObject.getThreadPoolQueueSize());
339         verify(consumerMock, times(1)).getMessageRouterResponse();
340         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
341         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
342         verifyNoMoreInteractions(dataRouterMock);
343         verifyNoMoreInteractions(fileCollectorMock);
344         verifyNoMoreInteractions(consumerMock);
345     }
346
347     @Test
348     public void consume_fetchFailedOnce() throws DatafileTaskException {
349         setUpConfiguration();
350
351         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
352         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
353
354         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
355
356         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
357         Mono<Object> error = Mono.error(new Exception("problem"));
358
359         // First file collect will fail, 3 will succeed
360         doReturn(error, collectedFile, collectedFile, collectedFile) //
361                 .when(fileCollectorMock) //
362                 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
363
364         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
365         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
366
367         StepVerifier //
368                 .create(testedObject.createMainTask(contextMap)) //
369                 .expectSubscription() //
370                 .expectNextCount(3) //
371                 .expectComplete() //
372                 .verify(); //
373
374         assertEquals(0, testedObject.getCurrentNumberOfTasks());
375         verify(consumerMock, times(1)).getMessageRouterResponse();
376         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
377         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
378         verifyNoMoreInteractions(dataRouterMock);
379         verifyNoMoreInteractions(fileCollectorMock);
380         verifyNoMoreInteractions(consumerMock);
381     }
382
383     @Test
384     public void consume_publishFailedOnce() throws DatafileTaskException {
385         setUpConfiguration();
386
387         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
388         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
389
390         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
391
392         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
393         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
394
395         Mono<Object> error = Mono.error(new Exception("problem"));
396         // One publish will fail, the rest will succeed
397         doReturn(collectedFile, error, collectedFile, collectedFile) //
398                 .when(dataRouterMock) //
399                 .publishFile(notNull(), anyLong(), notNull());
400
401         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
402         StepVerifier //
403                 .create(testedObject.createMainTask(contextMap)) //
404                 .expectSubscription() //
405                 .expectNextCount(3) // 3 completed files
406                 .expectComplete() //
407                 .verify(); //
408
409         assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
410
411         assertEquals(0, testedObject.getCurrentNumberOfTasks());
412         verify(consumerMock, times(1)).getMessageRouterResponse();
413         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
414         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
415         verifyNoMoreInteractions(dataRouterMock);
416         verifyNoMoreInteractions(fileCollectorMock);
417         verifyNoMoreInteractions(consumerMock);
418     }
419
420     @Test
421     public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
422         setUpConfiguration();
423
424         final int noOfEvents = 1;
425         final int noOfFilesPerEvent = 100;
426
427         // 100 files with the same name
428         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
429         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
430
431         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
432
433         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
434         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
435         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
436
437         StepVerifier //
438                 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
439                 .expectNextCount(1) // 99 is skipped
440                 .expectComplete() //
441                 .verify(); //
442
443         assertEquals(0, testedObject.getCurrentNumberOfTasks());
444         verify(consumerMock, times(1)).getMessageRouterResponse();
445         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
446         verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
447         verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
448         verifyNoMoreInteractions(dataRouterMock);
449         verifyNoMoreInteractions(fileCollectorMock);
450         verifyNoMoreInteractions(consumerMock);
451         verifyNoMoreInteractions(dataRouterMock);
452     }
453 }