2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.collectors.datafile.tasks;
23 import static org.awaitility.Awaitility.await;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyLong;
29 import static org.mockito.ArgumentMatchers.anyString;
30 import static org.mockito.ArgumentMatchers.notNull;
31 import static org.mockito.Mockito.doReturn;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.verifyNoMoreInteractions;
38 import ch.qos.logback.classic.spi.ILoggingEvent;
39 import ch.qos.logback.core.read.ListAppender;
40 import java.nio.file.Paths;
41 import java.time.Duration;
42 import java.time.Instant;
43 import java.util.HashMap;
44 import java.util.LinkedList;
45 import java.util.List;
47 import org.apache.commons.lang3.StringUtils;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Test;
50 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
51 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
52 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
53 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
54 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
55 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
56 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
57 import org.onap.dcaegen2.collectors.datafile.model.FileData;
58 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
59 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
60 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
61 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
62 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
63 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
64 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
65 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
66 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
68 import reactor.core.publisher.Flux;
69 import reactor.core.publisher.Mono;
70 import reactor.test.StepVerifier;
72 public class ScheduledTasksTest {
74 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
75 private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
77 private AppConfig appConfig = mock(AppConfig.class);
78 private ScheduledTasks testedObject;
80 private int uniqueValue = 0;
81 private DMaaPMessageConsumer consumerMock;
82 private PublishedChecker publishedCheckerMock;
83 private FileCollector fileCollectorMock;
84 private DataRouterPublisher dataRouterMock;
85 private Map<String, String> contextMap = new HashMap<String, String>();
87 private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
90 private void setUp() throws DatafileTaskException {
91 testedObject = spy(new ScheduledTasks(appConfig));
93 consumerMock = mock(DMaaPMessageConsumer.class);
94 publishedCheckerMock = mock(PublishedChecker.class);
95 fileCollectorMock = mock(FileCollector.class);
96 dataRouterMock = mock(DataRouterPublisher.class);
98 doReturn(consumerMock).when(testedObject).createConsumerTask();
99 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
100 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
101 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
104 private void setUpConfiguration() throws DatafileTaskException {
105 final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
106 .publishUrl(publishUrl) //
108 .userName("userName") //
109 .passWord("passWord") //
110 .trustStorePath("trustStorePath") //
111 .trustStorePasswordPath("trustStorePasswordPath") //
112 .keyStorePath("keyStorePath") //
113 .keyStorePasswordPath("keyStorePasswordPath") //
114 .enableDmaapCertAuth(true) //
115 .changeIdentifier(CHANGE_IDENTIFIER) //
117 final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
118 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
119 .trustStorePasswordPath("trustStorePasswordPath") //
120 .keyStorePath("keyStorePath") //
121 .keyStorePasswordPath("keyStorePasswordPath") //
122 .enableDmaapCertAuth(true) //
125 doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
126 doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
127 doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
130 private MessageMetaData messageMetaData() {
131 return ImmutableMessageMetaData.builder() //
132 .productName("productName") //
134 .lastEpochMicrosec("") //
136 .startEpochMicrosec("") //
137 .timeZoneOffset("") //
138 .changeIdentifier(CHANGE_IDENTIFIER) //
143 private FileData fileData(int instanceNumber) {
144 return ImmutableFileData.builder() //
145 .name(PM_FILE_NAME + instanceNumber) //
146 .fileFormatType("") //
147 .fileFormatVersion("") //
148 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
149 .scheme(Scheme.FTPS) //
151 .messageMetaData(messageMetaData()) //
155 private List<FileData> files(int size, boolean uniqueNames) {
156 List<FileData> list = new LinkedList<FileData>();
157 for (int i = 0; i < size; ++i) {
161 list.add(fileData(uniqueValue));
166 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
167 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
170 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
171 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
172 for (int i = 0; i < numberOfEvents; ++i) {
173 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
175 return Flux.fromIterable(list);
178 private FilePublishInformation filePublishInformation() {
179 return ImmutableFilePublishInformation //
183 .lastEpochMicrosec("") //
185 .startEpochMicrosec("") //
186 .timeZoneOffset("") //
189 .internalLocation(Paths.get("internalLocation")) //
191 .fileFormatType("") //
192 .fileFormatVersion("") //
193 .changeIdentifier(CHANGE_IDENTIFIER) //
194 .context(new HashMap<String, String>()).build();
198 public void purgeFileCache() {
199 testedObject.publishedFilesCache.put(Paths.get("file.xml"));
201 testedObject.purgeCachedInformation(Instant.MAX);
203 assertEquals(0, testedObject.publishedFilesCacheSize());
207 public void nothingToConsume() throws DatafileTaskException {
208 setUpConfiguration();
210 doReturn(consumerMock).when(testedObject).createConsumerTask();
211 doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
213 testedObject.executeDatafileMainTask();
215 assertEquals(0, testedObject.getCurrentNumberOfTasks());
216 verify(consumerMock, times(1)).getMessageRouterResponse();
217 verifyNoMoreInteractions(consumerMock);
221 public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
222 doReturn(51).when(testedObject).getCurrentNumberOfTasks();
224 testedObject.executeDatafileMainTask();
226 verifyNoMoreInteractions(consumerMock);
230 public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
231 setUpConfiguration();
233 final int noOfEvents = 1;
234 final int noOfFilesPerEvent = 1;
236 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
237 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
239 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
241 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
242 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
243 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
245 testedObject.executeDatafileMainTask();
247 await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
248 testedObject.getCurrentNumberOfSubscriptions()));
250 assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
252 verify(appConfig).getDmaapConsumerConfiguration();
253 verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
254 verifyNoMoreInteractions(appConfig);
256 assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
260 public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
261 final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
262 .publishUrl(publishUrl) //
264 .userName("userName") //
265 .passWord("passWord") //
266 .trustStorePath("trustStorePath") //
267 .trustStorePasswordPath("trustStorePasswordPath") //
268 .keyStorePath("keyStorePath") //
269 .keyStorePasswordPath("keyStorePasswordPath") //
270 .enableDmaapCertAuth(true) //
271 .changeIdentifier("Different changeIdentifier") //
273 final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
274 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
275 .trustStorePasswordPath("trustStorePasswordPath") //
276 .keyStorePath("keyStorePath") //
277 .keyStorePasswordPath("keyStorePasswordPath") //
278 .enableDmaapCertAuth(true) //
281 doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
282 doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
283 doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
284 final int noOfEvents = 1;
285 final int noOfFilesPerEvent = 1;
287 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
288 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
290 ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
291 testedObject.executeDatafileMainTask();
293 await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
294 testedObject.getCurrentNumberOfSubscriptions()));
296 assertTrue("Error missing in log", logAppender.list.toString().contains(
297 "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
301 public void createMainTask_consumeFail() {
302 MDC.setContextMap(contextMap);
303 doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
305 ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
307 .create(testedObject.createMainTask(contextMap)) //
308 .expectSubscription() //
309 .expectNextCount(0) //
313 assertTrue("Error missing in log", logAppender.list.toString()
314 .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"));
318 public void consume_successfulCase() throws DatafileTaskException {
319 setUpConfiguration();
321 final int noOfEvents = 200;
322 final int noOfFilesPerEvent = 200;
323 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
325 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
326 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
328 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
330 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
331 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
332 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
335 .create(testedObject.createMainTask(contextMap)) //
336 .expectSubscription() //
337 .expectNextCount(noOfFiles) //
341 assertEquals(0, testedObject.getCurrentNumberOfTasks());
342 assertEquals(0, testedObject.getThreadPoolQueueSize());
344 verify(consumerMock, times(1)).getMessageRouterResponse();
345 verifyNoMoreInteractions(consumerMock);
347 verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
348 verifyNoMoreInteractions(fileCollectorMock);
350 verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
351 verifyNoMoreInteractions(dataRouterMock);
353 assertEquals("totalReceivedEvents should have been 200", 200,
354 testedObject.getCounters().getTotalReceivedEvents());
358 public void consume_fetchFailedOnce() throws DatafileTaskException {
359 setUpConfiguration();
361 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
362 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
364 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
366 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
367 Mono<Object> error = Mono.error(new Exception("problem"));
369 // First file collect will fail, 3 will succeed
370 doReturn(error, collectedFile, collectedFile, collectedFile) //
371 .when(fileCollectorMock) //
372 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
374 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
375 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
378 .create(testedObject.createMainTask(contextMap)) //
379 .expectSubscription() //
380 .expectNextCount(3) //
384 assertEquals(0, testedObject.getCurrentNumberOfTasks());
386 verify(consumerMock, times(1)).getMessageRouterResponse();
387 verifyNoMoreInteractions(consumerMock);
389 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
390 verifyNoMoreInteractions(fileCollectorMock);
392 verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
393 verifyNoMoreInteractions(dataRouterMock);
395 assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
396 assertEquals("failedFtp should have been 1", 1, testedObject.getCounters().getNoOfFailedFtp());
400 public void consume_publishFailedOnce() throws DatafileTaskException {
401 setUpConfiguration();
403 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
404 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
406 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
408 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
409 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
411 Mono<Object> error = Mono.error(new Exception("problem"));
412 // One publish will fail, the rest will succeed
413 doReturn(collectedFile, error, collectedFile, collectedFile) //
414 .when(dataRouterMock) //
415 .publishFile(notNull(), anyLong(), notNull());
417 ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
419 .create(testedObject.createMainTask(contextMap)) //
420 .expectSubscription() //
421 .expectNextCount(3) // 3 completed files
425 assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
427 assertEquals(0, testedObject.getCurrentNumberOfTasks());
429 verify(consumerMock, times(1)).getMessageRouterResponse();
430 verifyNoMoreInteractions(consumerMock);
432 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
433 verifyNoMoreInteractions(fileCollectorMock);
435 verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
436 verifyNoMoreInteractions(dataRouterMock);
438 assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
439 assertEquals("noOfFailedPublish should have been 1", 1, testedObject.getCounters().getNoOfFailedPublish());
443 public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
444 setUpConfiguration();
446 final int noOfEvents = 1;
447 final int noOfFilesPerEvent = 100;
449 // 100 files with the same name
450 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
451 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
453 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
455 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
456 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
457 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
460 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
461 .expectNextCount(1) // 99 is skipped
465 assertEquals(0, testedObject.getCurrentNumberOfTasks());
467 verify(consumerMock, times(1)).getMessageRouterResponse();
468 verifyNoMoreInteractions(consumerMock);
470 verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
471 verifyNoMoreInteractions(fileCollectorMock);
473 verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
474 verifyNoMoreInteractions(dataRouterMock);
476 verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
477 verifyNoMoreInteractions(publishedCheckerMock);
479 assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());