2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * Copyright (C) 2020 Nokia. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.dcaegen2.collectors.datafile.tasks;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.anyString;
31 import static org.mockito.ArgumentMatchers.notNull;
32 import static org.mockito.Mockito.doReturn;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.spy;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.verifyNoMoreInteractions;
39 import ch.qos.logback.classic.spi.ILoggingEvent;
40 import ch.qos.logback.core.read.ListAppender;
42 import java.nio.file.Paths;
43 import java.time.Duration;
44 import java.time.Instant;
45 import java.util.HashMap;
46 import java.util.LinkedList;
47 import java.util.List;
50 import org.apache.commons.lang3.StringUtils;
51 import org.junit.jupiter.api.BeforeEach;
52 import org.junit.jupiter.api.Test;
53 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
54 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
55 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
56 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
57 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
58 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
59 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
60 import org.onap.dcaegen2.collectors.datafile.model.FileData;
61 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
62 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
63 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
64 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
65 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
66 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
67 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
68 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
69 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
72 import reactor.core.publisher.Flux;
73 import reactor.core.publisher.Mono;
74 import reactor.test.StepVerifier;
76 public class ScheduledTasksTest {
78 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
79 private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
81 private AppConfig appConfig = mock(AppConfig.class);
82 private ScheduledTasks testedObject;
84 private int uniqueValue = 0;
85 private DMaaPMessageConsumer consumerMock;
86 private PublishedChecker publishedCheckerMock;
87 private FileCollector fileCollectorMock;
88 private DataRouterPublisher dataRouterMock;
89 private Map<String, String> contextMap = new HashMap<String, String>();
91 private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
94 private void setUp() throws DatafileTaskException {
95 testedObject = spy(new ScheduledTasks(appConfig));
97 consumerMock = mock(DMaaPMessageConsumer.class);
98 publishedCheckerMock = mock(PublishedChecker.class);
99 fileCollectorMock = mock(FileCollector.class);
100 dataRouterMock = mock(DataRouterPublisher.class);
102 doReturn(consumerMock).when(testedObject).createConsumerTask();
103 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
104 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
105 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
108 private void setUpConfiguration() throws DatafileTaskException {
109 final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
110 .publishUrl(publishUrl) //
112 .userName("userName") //
113 .passWord("passWord") //
114 .trustStorePath("trustStorePath") //
115 .trustStorePasswordPath("trustStorePasswordPath") //
116 .keyStorePath("keyStorePath") //
117 .keyStorePasswordPath("keyStorePasswordPath") //
118 .enableDmaapCertAuth(true) //
119 .changeIdentifier(CHANGE_IDENTIFIER) //
121 final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
122 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
123 .trustStorePasswordPath("trustStorePasswordPath") //
124 .keyStorePath("keyStorePath") //
125 .keyStorePasswordPath("keyStorePasswordPath") //
126 .enableDmaapCertAuth(true) //
129 doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
130 doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
131 doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
134 private MessageMetaData messageMetaData() {
135 return ImmutableMessageMetaData.builder() //
136 .productName("productName") //
138 .lastEpochMicrosec("") //
140 .startEpochMicrosec("") //
141 .timeZoneOffset("") //
142 .changeIdentifier(CHANGE_IDENTIFIER) //
147 private FileData fileData(int instanceNumber) {
148 return ImmutableFileData.builder() //
149 .name(PM_FILE_NAME + instanceNumber) //
150 .fileFormatType("") //
151 .fileFormatVersion("") //
152 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
153 .scheme(Scheme.FTPES) //
155 .messageMetaData(messageMetaData()) //
159 private List<FileData> files(int size, boolean uniqueNames) {
160 List<FileData> list = new LinkedList<FileData>();
161 for (int i = 0; i < size; ++i) {
165 list.add(fileData(uniqueValue));
170 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
171 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
174 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
175 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
176 for (int i = 0; i < numberOfEvents; ++i) {
177 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
179 return Flux.fromIterable(list);
182 private FilePublishInformation filePublishInformation() {
183 return ImmutableFilePublishInformation //
187 .lastEpochMicrosec("") //
189 .startEpochMicrosec("") //
190 .timeZoneOffset("") //
193 .internalLocation(Paths.get("internalLocation")) //
195 .fileFormatType("") //
196 .fileFormatVersion("") //
197 .changeIdentifier(CHANGE_IDENTIFIER) //
198 .context(new HashMap<String, String>()).build();
202 public void purgeFileCache() {
203 testedObject.publishedFilesCache.put(Paths.get("file.xml"));
205 testedObject.purgeCachedInformation(Instant.MAX);
207 assertEquals(0, testedObject.publishedFilesCacheSize());
211 public void nothingToConsume() throws DatafileTaskException {
212 setUpConfiguration();
214 doReturn(consumerMock).when(testedObject).createConsumerTask();
215 doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
217 testedObject.executeDatafileMainTask();
219 assertEquals(0, testedObject.getCurrentNumberOfTasks());
220 verify(consumerMock, times(1)).getMessageRouterResponse();
221 verifyNoMoreInteractions(consumerMock);
225 public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
226 doReturn(51).when(testedObject).getCurrentNumberOfTasks();
228 testedObject.executeDatafileMainTask();
230 verifyNoMoreInteractions(consumerMock);
234 public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
235 setUpConfiguration();
237 final int noOfEvents = 1;
238 final int noOfFilesPerEvent = 1;
240 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
241 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
243 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
245 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
246 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
247 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
249 testedObject.executeDatafileMainTask();
251 await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
252 testedObject.getCurrentNumberOfSubscriptions()));
254 assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
256 verify(appConfig).getDmaapConsumerConfiguration();
257 verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
258 verifyNoMoreInteractions(appConfig);
260 assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
264 public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
265 final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
266 .publishUrl(publishUrl) //
268 .userName("userName") //
269 .passWord("passWord") //
270 .trustStorePath("trustStorePath") //
271 .trustStorePasswordPath("trustStorePasswordPath") //
272 .keyStorePath("keyStorePath") //
273 .keyStorePasswordPath("keyStorePasswordPath") //
274 .enableDmaapCertAuth(true) //
275 .changeIdentifier("Different changeIdentifier") //
277 final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
278 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
279 .trustStorePasswordPath("trustStorePasswordPath") //
280 .keyStorePath("keyStorePath") //
281 .keyStorePasswordPath("keyStorePasswordPath") //
282 .enableDmaapCertAuth(true) //
285 doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
286 doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
287 doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
288 final int noOfEvents = 1;
289 final int noOfFilesPerEvent = 1;
291 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
292 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
294 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
295 testedObject.executeDatafileMainTask();
297 await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
298 testedObject.getCurrentNumberOfSubscriptions()));
300 assertTrue("Error missing in log", logAppender.list.toString().contains(
301 "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
305 public void createMainTask_consumeFail() {
306 MDC.setContextMap(contextMap);
307 doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
309 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
311 .create(testedObject.createMainTask(contextMap)) //
312 .expectSubscription() //
313 .expectNextCount(0) //
317 assertTrue("Error missing in log", logAppender.list.toString()
318 .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"));
322 public void consume_successfulCase() throws DatafileTaskException {
323 setUpConfiguration();
325 final int noOfEvents = 200;
326 final int noOfFilesPerEvent = 200;
327 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
329 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
330 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
332 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
334 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
335 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
336 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
339 .create(testedObject.createMainTask(contextMap)) //
340 .expectSubscription() //
341 .expectNextCount(noOfFiles) //
345 assertEquals(0, testedObject.getCurrentNumberOfTasks());
346 assertEquals(0, testedObject.getThreadPoolQueueSize());
348 verify(consumerMock, times(1)).getMessageRouterResponse();
349 verifyNoMoreInteractions(consumerMock);
351 verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
352 verifyNoMoreInteractions(fileCollectorMock);
354 verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
355 verifyNoMoreInteractions(dataRouterMock);
357 assertEquals("totalReceivedEvents should have been 200", 200,
358 testedObject.getCounters().getTotalReceivedEvents());
362 public void consume_fetchFailedOnce() throws DatafileTaskException {
363 setUpConfiguration();
365 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
366 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
368 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
370 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
371 Mono<Object> error = Mono.error(new Exception("problem"));
373 // First file collect will fail, 3 will succeed
374 doReturn(error, collectedFile, collectedFile, collectedFile) //
375 .when(fileCollectorMock) //
376 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
378 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
379 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
382 .create(testedObject.createMainTask(contextMap)) //
383 .expectSubscription() //
384 .expectNextCount(3) //
388 assertEquals(0, testedObject.getCurrentNumberOfTasks());
390 verify(consumerMock, times(1)).getMessageRouterResponse();
391 verifyNoMoreInteractions(consumerMock);
393 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
394 verifyNoMoreInteractions(fileCollectorMock);
396 verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
397 verifyNoMoreInteractions(dataRouterMock);
399 assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
400 assertEquals("failedFtp should have been 1", 1, testedObject.getCounters().getNoOfFailedFtp());
404 public void consume_publishFailedOnce() throws DatafileTaskException {
405 setUpConfiguration();
407 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
408 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
410 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
412 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
413 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
415 Mono<Object> error = Mono.error(new Exception("problem"));
416 // One publish will fail, the rest will succeed
417 doReturn(collectedFile, error, collectedFile, collectedFile) //
418 .when(dataRouterMock) //
419 .publishFile(notNull(), anyLong(), notNull());
421 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
423 .create(testedObject.createMainTask(contextMap)) //
424 .expectSubscription() //
425 .expectNextCount(3) // 3 completed files
429 assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
431 assertEquals(0, testedObject.getCurrentNumberOfTasks());
433 verify(consumerMock, times(1)).getMessageRouterResponse();
434 verifyNoMoreInteractions(consumerMock);
436 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
437 verifyNoMoreInteractions(fileCollectorMock);
439 verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
440 verifyNoMoreInteractions(dataRouterMock);
442 assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
443 assertEquals("noOfFailedPublish should have been 1", 1, testedObject.getCounters().getNoOfFailedPublish());
447 public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
448 setUpConfiguration();
450 final int noOfEvents = 1;
451 final int noOfFilesPerEvent = 100;
453 // 100 files with the same name
454 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
455 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
457 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
459 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
460 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
461 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
464 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
465 .expectNextCount(1) // 99 is skipped
469 assertEquals(0, testedObject.getCurrentNumberOfTasks());
471 verify(consumerMock, times(1)).getMessageRouterResponse();
472 verifyNoMoreInteractions(consumerMock);
474 verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
475 verifyNoMoreInteractions(fileCollectorMock);
477 verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
478 verifyNoMoreInteractions(dataRouterMock);
480 verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
481 verifyNoMoreInteractions(publishedCheckerMock);
483 assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());