2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * Copyright (C) 2020 Nokia. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.dcaegen2.collectors.datafile.tasks;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.jupiter.api.Assertions.assertEquals;
26 import static org.junit.jupiter.api.Assertions.assertFalse;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.anyString;
31 import static org.mockito.ArgumentMatchers.notNull;
32 import static org.mockito.Mockito.doReturn;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.spy;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.verifyNoMoreInteractions;
39 import ch.qos.logback.classic.spi.ILoggingEvent;
40 import ch.qos.logback.core.read.ListAppender;
42 import java.nio.file.Paths;
43 import java.time.Duration;
44 import java.time.Instant;
45 import java.util.HashMap;
46 import java.util.LinkedList;
47 import java.util.List;
50 import org.apache.commons.lang3.StringUtils;
51 import org.junit.jupiter.api.BeforeEach;
52 import org.junit.jupiter.api.Test;
53 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
54 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
55 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
56 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
57 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
58 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
59 import org.onap.dcaegen2.collectors.datafile.model.FileData;
60 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
61 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
62 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
63 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
64 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
65 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
66 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
67 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
68 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
69 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
70 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
71 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
74 import reactor.core.publisher.Flux;
75 import reactor.core.publisher.Mono;
76 import reactor.test.StepVerifier;
78 public class ScheduledTasksTest {
80 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
81 private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
83 private AppConfig appConfig = mock(AppConfig.class);
84 private ScheduledTasks testedObject;
86 private int uniqueValue = 0;
87 private DMaaPMessageConsumer consumerMock;
88 private PublishedChecker publishedCheckerMock;
89 private FileCollector fileCollectorMock;
90 private DataRouterPublisher dataRouterMock;
91 private Map<String, String> contextMap = new HashMap<String, String>();
93 private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
96 private void setUp() throws DatafileTaskException {
97 testedObject = spy(new ScheduledTasks(appConfig));
99 consumerMock = mock(DMaaPMessageConsumer.class);
100 publishedCheckerMock = mock(PublishedChecker.class);
101 fileCollectorMock = mock(FileCollector.class);
102 dataRouterMock = mock(DataRouterPublisher.class);
104 doReturn(consumerMock).when(testedObject).createConsumerTask();
105 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
106 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
107 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
110 private void setUpConfiguration() throws DatafileTaskException {
111 final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
112 .publishUrl(publishUrl) //
114 .userName("userName") //
115 .password("passWord") //
116 .trustStorePath("trustStorePath") //
117 .trustStorePasswordPath("trustStorePasswordPath") //
118 .keyStorePath("keyStorePath") //
119 .keyStorePasswordPath("keyStorePasswordPath") //
120 .enableDmaapCertAuth(true) //
121 .changeIdentifier(CHANGE_IDENTIFIER) //
123 final ConsumerConfiguration dmaapConsumerConfiguration =
124 new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
125 mock(MessageRouterSubscribeRequest.class));
128 doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
129 doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
130 doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
133 private MessageMetaData messageMetaData() {
134 return ImmutableMessageMetaData.builder() //
135 .productName("productName") //
137 .lastEpochMicrosec("") //
139 .startEpochMicrosec("") //
140 .timeZoneOffset("") //
141 .changeIdentifier(CHANGE_IDENTIFIER) //
146 private FileData fileData(int instanceNumber) {
147 return ImmutableFileData.builder() //
148 .name(PM_FILE_NAME + instanceNumber) //
149 .fileFormatType("") //
150 .fileFormatVersion("") //
151 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
152 .scheme(Scheme.FTPES) //
154 .messageMetaData(messageMetaData()) //
158 private List<FileData> files(int size, boolean uniqueNames) {
159 List<FileData> list = new LinkedList<FileData>();
160 for (int i = 0; i < size; ++i) {
164 list.add(fileData(uniqueValue));
169 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
170 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
173 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
174 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
175 for (int i = 0; i < numberOfEvents; ++i) {
176 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
178 return Flux.fromIterable(list);
181 private FilePublishInformation filePublishInformation() {
182 return ImmutableFilePublishInformation //
186 .lastEpochMicrosec("") //
188 .startEpochMicrosec("") //
189 .timeZoneOffset("") //
192 .internalLocation(Paths.get("internalLocation")) //
194 .fileFormatType("") //
195 .fileFormatVersion("") //
196 .changeIdentifier(CHANGE_IDENTIFIER) //
197 .context(new HashMap<String, String>()).build();
201 public void purgeFileCache() {
202 testedObject.publishedFilesCache.put(Paths.get("file.xml"));
204 testedObject.purgeCachedInformation(Instant.MAX);
206 assertEquals(0, testedObject.publishedFilesCacheSize());
210 public void nothingToConsume() throws DatafileTaskException {
211 setUpConfiguration();
213 doReturn(consumerMock).when(testedObject).createConsumerTask();
214 doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
216 testedObject.executeDatafileMainTask();
218 assertEquals(0, testedObject.getCurrentNumberOfTasks());
219 verify(consumerMock, times(1)).getMessageRouterResponse();
220 verifyNoMoreInteractions(consumerMock);
224 public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
225 doReturn(51).when(testedObject).getCurrentNumberOfTasks();
227 testedObject.executeDatafileMainTask();
229 verifyNoMoreInteractions(consumerMock);
233 public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
234 setUpConfiguration();
236 final int noOfEvents = 1;
237 final int noOfFilesPerEvent = 1;
239 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
240 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
242 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
244 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
245 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
246 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
248 testedObject.executeDatafileMainTask();
250 await().untilAsserted(() -> assertEquals(0,
251 testedObject.getCurrentNumberOfSubscriptions()));
253 assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
255 verify(appConfig).getDmaapConsumerConfiguration();
256 verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
257 verifyNoMoreInteractions(appConfig);
259 assertEquals(1, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 1");
263 public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
264 final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
265 .publishUrl(publishUrl) //
267 .userName("userName") //
268 .password("passWord") //
269 .trustStorePath("trustStorePath") //
270 .trustStorePasswordPath("trustStorePasswordPath") //
271 .keyStorePath("keyStorePath") //
272 .keyStorePasswordPath("keyStorePasswordPath") //
273 .enableDmaapCertAuth(true) //
274 .changeIdentifier("Different changeIdentifier") //
276 final ConsumerConfiguration dmaapConsumerConfiguration =
277 new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
278 mock(MessageRouterSubscribeRequest.class));
280 doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
281 doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
282 doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
283 final int noOfEvents = 1;
284 final int noOfFilesPerEvent = 1;
286 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
287 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
289 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
290 testedObject.executeDatafileMainTask();
292 await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions(),"currentNumberOfSubscriptions should have been 0"));
294 assertTrue(logAppender.list.toString().contains(
295 "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"),"Error missing in log");
299 public void createMainTask_consumeFail() {
300 MDC.setContextMap(contextMap);
301 doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
303 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
305 .create(testedObject.createMainTask(contextMap)) //
306 .expectSubscription() //
307 .expectNextCount(0) //
311 assertTrue(logAppender.list.toString()
312 .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"),"Error missing in log");
316 public void consume_successfulCase() throws DatafileTaskException {
317 setUpConfiguration();
319 final int noOfEvents = 200;
320 final int noOfFilesPerEvent = 200;
321 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
323 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
324 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
326 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
328 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
329 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
330 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
333 .create(testedObject.createMainTask(contextMap)) //
334 .expectSubscription() //
335 .expectNextCount(noOfFiles) //
339 assertEquals(0, testedObject.getCurrentNumberOfTasks());
340 assertEquals(0, testedObject.getThreadPoolQueueSize());
342 verify(consumerMock, times(1)).getMessageRouterResponse();
343 verifyNoMoreInteractions(consumerMock);
345 verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
346 verifyNoMoreInteractions(fileCollectorMock);
348 verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
349 verifyNoMoreInteractions(dataRouterMock);
352 testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 200");
356 public void consume_fetchFailedOnce() throws DatafileTaskException {
357 setUpConfiguration();
359 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
360 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
362 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
364 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
365 Mono<Object> error = Mono.error(new Exception("problem"));
367 // First file collect will fail, 3 will succeed
368 doReturn(error, collectedFile, collectedFile, collectedFile) //
369 .when(fileCollectorMock) //
370 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
372 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
373 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
376 .create(testedObject.createMainTask(contextMap)) //
377 .expectSubscription() //
378 .expectNextCount(3) //
382 assertEquals(0, testedObject.getCurrentNumberOfTasks());
384 verify(consumerMock, times(1)).getMessageRouterResponse();
385 verifyNoMoreInteractions(consumerMock);
387 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
388 verifyNoMoreInteractions(fileCollectorMock);
390 verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
391 verifyNoMoreInteractions(dataRouterMock);
393 assertEquals(2, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 2");
394 assertEquals(1, testedObject.getCounters().getNoOfFailedFtp(),"failedFtp should have been 1");
398 public void consume_publishFailedOnce() throws DatafileTaskException {
399 setUpConfiguration();
401 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
402 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
404 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
406 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
407 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
409 Mono<Object> error = Mono.error(new Exception("problem"));
410 // One publish will fail, the rest will succeed
411 doReturn(collectedFile, error, collectedFile, collectedFile) //
412 .when(dataRouterMock) //
413 .publishFile(notNull(), anyLong(), notNull());
415 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
417 .create(testedObject.createMainTask(contextMap)) //
418 .expectSubscription() //
419 .expectNextCount(3) // 3 completed files
423 assertTrue(logAppender.list.toString().contains("[ERROR] File publishing failed: "));
425 assertEquals(0, testedObject.getCurrentNumberOfTasks());
427 verify(consumerMock, times(1)).getMessageRouterResponse();
428 verifyNoMoreInteractions(consumerMock);
430 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
431 verifyNoMoreInteractions(fileCollectorMock);
433 verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
434 verifyNoMoreInteractions(dataRouterMock);
436 assertEquals(2, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 2");
437 assertEquals(1, testedObject.getCounters().getNoOfFailedPublish(),"noOfFailedPublish should have been 1");
441 public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
442 setUpConfiguration();
444 final int noOfEvents = 1;
445 final int noOfFilesPerEvent = 100;
447 // 100 files with the same name
448 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
449 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
451 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
453 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
454 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
455 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
458 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
459 .expectNextCount(1) // 99 is skipped
463 assertEquals(0, testedObject.getCurrentNumberOfTasks());
465 verify(consumerMock, times(1)).getMessageRouterResponse();
466 verifyNoMoreInteractions(consumerMock);
468 verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
469 verifyNoMoreInteractions(fileCollectorMock);
471 verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
472 verifyNoMoreInteractions(dataRouterMock);
474 verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
475 verifyNoMoreInteractions(publishedCheckerMock);
477 assertEquals(1, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 1");