Improvement of the parallelism 32/78932/3
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 20 Feb 2019 07:35:16 +0000 (08:35 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 6 Mar 2019 09:24:33 +0000 (10:24 +0100)
The reactive framework Scedulers uses to few threads.
(the same number as the number of processors).
That is too few for an io-intense application like this
where CPU is not the limiting factor.

Change-Id: Ia5f41e75716d309f47dce5f5273b739f7e6d136a
Issue-ID: DCAEGEN2-1118
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java [new file with mode: 0644]

index 50f5431..783c699 100644 (file)
@@ -40,23 +40,23 @@ import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 /**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
  */
 @Component
 public class ScheduledTasks {
 
     private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
+    private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
 
-    /** Data needed for fetching of files from one PNF */
+    /** Data needed for fetching of one file */
     private class FileCollectionData {
         final FileData fileData;
-        final FileCollector collectorTask; // Same object, ftp session etc. can be used for each
-                                           // file in one VES
-                                           // event
+        final FileCollector collectorTask;
         final MessageMetaData metaData;
 
         FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
@@ -68,16 +68,15 @@ public class ScheduledTasks {
 
     private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
     private final AppConfig applicationConfiguration;
-    private final AtomicInteger taskCounter = new AtomicInteger();
-
+    private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+    private final Scheduler scheduler =
+            Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
     PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
 
     /**
      * Constructor for task registration in Datafile Workflow.
      *
      * @param applicationConfiguration - application configuration
-     * @param xnfCollectorTask - second task
-     * @param dmaapPublisherTask - third task
      */
     @Autowired
     public ScheduledTasks(AppConfig applicationConfiguration) {
@@ -90,20 +89,21 @@ public class ScheduledTasks {
     public void scheduleMainDatafileEventTask() {
         logger.trace("Execution of tasks was registered");
         applicationConfiguration.initFileStreamReader();
-        //@formatter:off
-        consumeMessagesFromDmaap()
-            .parallel() // Each FileReadyMessage in a separate thread
-            .runOn(Schedulers.parallel())
-            .flatMap(this::createFileCollectionTask)
-            .filter(this::shouldBePublished)
-            .doOnNext(fileData -> taskCounter.incrementAndGet())
-            .flatMap(this::collectFileFromXnf)
-            .flatMap(this::publishToDataRouter)
-            .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
-            .doOnNext(model -> taskCounter.decrementAndGet())
-            .sequential()
-            .subscribe(this::onSuccess, this::onError, this::onComplete);
-        //@formatter:on
+        createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+    }
+
+    Flux<ConsumerDmaapModel> createMainTask() {
+        return fetchMoreFileReadyMessages() //
+                .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+                .runOn(scheduler) //
+                .flatMap(this::createFileCollectionTask) //
+                .filter(this::shouldBePublished) //
+                .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
+                .flatMap(this::collectFileFromXnf) //
+                .flatMap(this::publishToDataRouter) //
+                .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+                .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+                .sequential();
     }
 
     /**
@@ -125,13 +125,20 @@ public class ScheduledTasks {
         logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
     }
 
+    private int getParallelism() {
+        if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
+            return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
+        } else {
+            return 1; // We need at least one rail/thread
+        }
+    }
+
     private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
         List<FileCollectionData> fileCollects = new ArrayList<>();
 
         for (FileData fileData : availableFiles.files()) {
-            FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
-                    new SftpClient(fileData.fileServerData()));
-            fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
+            fileCollects.add(
+                    new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
         }
         return Flux.fromIterable(fileCollects);
     }
@@ -154,7 +161,7 @@ public class ScheduledTasks {
         logger.error("File fetching failed: {}", localFileName);
         deleteFile(localFileName);
         alreadyPublishedFiles.remove(localFileName);
-        taskCounter.decrementAndGet();
+        currentNumberOfTasks.decrementAndGet();
         return Mono.empty();
     }
 
@@ -162,7 +169,7 @@ public class ScheduledTasks {
         final long maxNumberOfRetries = 3;
         final Duration initialRetryTimeout = Duration.ofSeconds(5);
 
-        DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+        DataRouterPublisher publisherTask = createDataRouterPublisher();
 
         return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
                 .onErrorResume(exception -> handlePublishFailure(model, exception));
@@ -173,20 +180,21 @@ public class ScheduledTasks {
         Path internalFileName = Paths.get(model.getInternalLocation());
         deleteFile(internalFileName);
         alreadyPublishedFiles.remove(internalFileName);
-        taskCounter.decrementAndGet();
+        currentNumberOfTasks.decrementAndGet();
         return Mono.empty();
     }
 
-    private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
-        final int currentNumberOfTasks = taskCounter.get();
-        logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
-        if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+    /**
+     * Fetch more messages from the message router. This is done in a polling/blocking fashion.
+     */
+    private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
+        logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+        if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
             return Flux.empty();
         }
 
-        final DMaaPMessageConsumerTask messageConsumerTask =
-                new DMaaPMessageConsumerTask(this.applicationConfiguration);
-        return messageConsumerTask.execute() //
+        return createConsumerTask() //
+                .execute() //
                 .onErrorResume(this::handleConsumeMessageFailure);
     }
 
@@ -200,7 +208,25 @@ public class ScheduledTasks {
         try {
             Files.delete(localFile);
         } catch (Exception e) {
-            logger.warn("Could not delete file: {}, {}", localFile, e);
+            logger.trace("Could not delete file: {}", localFile);
         }
     }
+
+    int getCurrentNumberOfTasks() {
+        return currentNumberOfTasks.get();
+    }
+
+    DMaaPMessageConsumerTask createConsumerTask() {
+        return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+    }
+
+    FileCollector createFileCollector(FileData fileData) {
+        return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+                new SftpClient(fileData.fileServerData()));
+    }
+
+    DataRouterPublisher createDataRouterPublisher() {
+        return new DataRouterPublisher(applicationConfiguration);
+    }
+
 }
index d2240f1..24b82fe 100644 (file)
@@ -61,37 +61,38 @@ class DataRouterPublisherTest {
 
     @BeforeAll
     public static void setUp() {
-        //@formatter:off
+
         dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
-                .dmaapContentType("application/json")
-                .dmaapHostName("54.45.33.2")
-                .dmaapPortNumber(1234)
-                .dmaapProtocol("https")
-                .dmaapUserName("DFC")
-                .dmaapUserPassword("DFC")
-                .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT")
-                .trustStorePath("trustStorePath")
-                .trustStorePasswordPath("trustStorePasswordPath")
-                .keyStorePath("keyStorePath")
-                .keyStorePasswordPath("keyStorePasswordPath")
-                .enableDmaapCertAuth(true)
-                .build();
+                .dmaapContentType("application/json") //
+                .dmaapHostName("54.45.33.2") //
+                .dmaapPortNumber(1234) //
+                .dmaapProtocol("https") //
+                .dmaapUserName("DFC") //
+                .dmaapUserPassword("DFC") //
+                .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+                .trustStorePath("trustStorePath") //
+                .trustStorePasswordPath("trustStorePasswordPath") //
+                .keyStorePath("keyStorePath") //
+                .keyStorePasswordPath("keyStorePasswordPath") //
+                .enableDmaapCertAuth(true) //
+                .build(); //
         consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .name(PM_FILE_NAME)
-                .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME)
-                .internalLocation("target/" + PM_FILE_NAME)
-                .compression("gzip")
-                .fileFormatType("org.3GPP.32.435#measCollec")
-                .fileFormatVersion("V10")
-                .build();
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .name(PM_FILE_NAME) //
+                .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
+                .internalLocation("target/" + PM_FILE_NAME) //
+                .compression("gzip") //
+                .fileFormatType("org.3GPP.32.435#measCollec") //
+                .fileFormatVersion("V10") //
+                .build(); //
         appConfig = mock(AppConfig.class);
-        //@formatter:on
+
+        doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
     }
 
     @Test
@@ -132,7 +133,7 @@ class DataRouterPublisherTest {
         dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
         when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
                 nextHttpResponses);
-        when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+
         dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
         when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
         doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
new file mode 100644 (file)
index 0000000..0662216
--- /dev/null
@@ -0,0 +1,285 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+public class ScheduledTasksTest {
+
+    private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+
+    private AppConfig appConfig = mock(AppConfig.class);
+    private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
+
+    private int uniqueValue = 0;
+    private DMaaPMessageConsumerTask consumerMock;
+    private FileCollector fileCollectorMock;
+    private DataRouterPublisher dataRouterMock;
+
+    @BeforeEach
+    private void setUp() {
+        DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
+                .dmaapContentType("application/json") //
+                .dmaapHostName("54.45.33.2") //
+                .dmaapPortNumber(1234) //
+                .dmaapProtocol("https") //
+                .dmaapUserName("DFC") //
+                .dmaapUserPassword("DFC") //
+                .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+                .trustStorePath("trustStorePath") //
+                .trustStorePasswordPath("trustStorePasswordPath") //
+                .keyStorePath("keyStorePath") //
+                .keyStorePasswordPath("keyStorePasswordPath") //
+                .enableDmaapCertAuth(true) //
+                .build(); //
+
+        consumerMock = mock(DMaaPMessageConsumerTask.class);
+        fileCollectorMock = mock(FileCollector.class);
+        dataRouterMock = mock(DataRouterPublisher.class);
+
+        doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+        doReturn(consumerMock).when(testedObject).createConsumerTask();
+        doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
+        doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
+    }
+
+    private MessageMetaData messageMetaData() {
+        return ImmutableMessageMetaData.builder() //
+                .productName("productName") //
+                .vendorName("") //
+                .lastEpochMicrosec("") //
+                .sourceName("") //
+                .startEpochMicrosec("") //
+                .timeZoneOffset("") //
+                .changeIdentifier("") //
+                .changeType("") //
+                .build();
+    }
+
+    private FileData fileData(int instanceNumber) {
+        return ImmutableFileData.builder() //
+                .name("name" + instanceNumber) //
+                .fileFormatType("") //
+                .fileFormatVersion("") //
+                .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
+                .scheme(Scheme.FTPS) //
+                .compression("") //
+                .build();
+    }
+
+    private List<FileData> files(int size, boolean uniqueNames) {
+        List<FileData> list = new LinkedList<FileData>();
+        for (int i = 0; i < size; ++i) {
+            if (uniqueNames) {
+                ++uniqueValue;
+            }
+            list.add(fileData(uniqueValue));
+        }
+        return list;
+    }
+
+    private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
+        MessageMetaData md = messageMetaData();
+        return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
+                .files(files(numberOfFiles, uniqueNames)).build();
+    }
+
+    private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
+        List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
+        for (int i = 0; i < numberOfEvents; ++i) {
+            list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
+        }
+        return Flux.fromIterable(list);
+    }
+
+    private ConsumerDmaapModel consumerData() {
+        return ImmutableConsumerDmaapModel //
+                .builder() //
+                .productName("") //
+                .vendorName("") //
+                .lastEpochMicrosec("") //
+                .sourceName("") //
+                .startEpochMicrosec("") //
+                .timeZoneOffset("") //
+                .name("") //
+                .location("") //
+                .internalLocation("internalLocation") //
+                .compression("") //
+                .fileFormatType("") //
+                .fileFormatVersion("") //
+                .build();
+    }
+
+    @Test
+    public void notingToConsume() {
+        doReturn(consumerMock).when(testedObject).createConsumerTask();
+        doReturn(Flux.empty()).when(consumerMock).execute();
+
+        testedObject.scheduleMainDatafileEventTask();
+
+        assertEquals(0, testedObject.getCurrentNumberOfTasks());
+        verify(consumerMock, times(1)).execute();
+        verifyNoMoreInteractions(consumerMock);
+    }
+
+    @Test
+    public void consume_successfulCase() {
+        final int noOfEvents = 200;
+        final int noOfFilesPerEvent = 200;
+        final int noOfFiles = noOfEvents * noOfFilesPerEvent;
+
+        Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
+        doReturn(fileReadyMessages).when(consumerMock).execute();
+
+        Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+                .expectNextCount(noOfFiles) //
+                .expectComplete() //
+                .verify(); //
+
+        assertEquals(0, testedObject.getCurrentNumberOfTasks());
+        verify(consumerMock, times(1)).execute();
+        verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
+        verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+        verifyNoMoreInteractions(dataRouterMock);
+        verifyNoMoreInteractions(fileCollectorMock);
+        verifyNoMoreInteractions(consumerMock);
+    }
+
+    @Test
+    public void consume_fetchFailedOnce() {
+        Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+        doReturn(fileReadyMessages).when(consumerMock).execute();
+
+        Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+        Mono<Object> error = Mono.error(new Exception("problem"));
+
+        // First file collect will fail, 3 will succeed
+        doReturn(error, collectedFile, collectedFile, collectedFile) //
+                .when(fileCollectorMock) //
+                .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+                .expectNextCount(3) //
+                .expectComplete() //
+                .verify(); //
+
+        assertEquals(0, testedObject.getCurrentNumberOfTasks());
+        verify(consumerMock, times(1)).execute();
+        verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+        verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+        verifyNoMoreInteractions(dataRouterMock);
+        verifyNoMoreInteractions(fileCollectorMock);
+        verifyNoMoreInteractions(consumerMock);
+    }
+
+    @Test
+    public void consume_publishFailedOnce() {
+
+        Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+        doReturn(fileReadyMessages).when(consumerMock).execute();
+
+        Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+
+        Mono<Object> error = Mono.error(new Exception("problem"));
+        // One publish will fail, the rest will succeed
+        doReturn(collectedFile, error, collectedFile, collectedFile) //
+                .when(dataRouterMock) //
+                .execute(notNull(), anyLong(), notNull());
+
+        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+                .expectNextCount(3) // 3 completed files
+                .expectComplete() //
+                .verify(); //
+
+        assertEquals(0, testedObject.getCurrentNumberOfTasks());
+        verify(consumerMock, times(1)).execute();
+        verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+        verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+        verifyNoMoreInteractions(dataRouterMock);
+        verifyNoMoreInteractions(fileCollectorMock);
+        verifyNoMoreInteractions(consumerMock);
+    }
+
+    @Test
+    public void consume_successfulCase_sameFileNames() {
+        final int noOfEvents = 1;
+        final int noOfFilesPerEvent = 100;
+
+        // 100 files with the same name
+        Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
+        doReturn(fileReadyMessages).when(consumerMock).execute();
+
+        Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+                .expectNextCount(1) // 99 is skipped
+                .expectComplete() //
+                .verify(); //
+
+        assertEquals(0, testedObject.getCurrentNumberOfTasks());
+        verify(consumerMock, times(1)).execute();
+        verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
+        verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+        verifyNoMoreInteractions(dataRouterMock);
+        verifyNoMoreInteractions(fileCollectorMock);
+        verifyNoMoreInteractions(consumerMock);
+    }
+
+
+}