private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache publishedFilesCache = new PublishedFileCache();
*/
public void executeDatafileMainTask() {
try {
- if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) {
logger.info(
- "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}",
- getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size());
+ "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, published files: {}, number of queued VES events: {}",
+ getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(),
+ threadPoolQueueSize.get());
return;
}
Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
+ .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
- .flatMap(this::publishToDataRouter,false, 1, 1) //
+ .flatMap(this::publishToDataRouter, false, 1, 1) //
.doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
.doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
-
}
private class FileDataWithContext {
return currentNumberOfSubscriptions.get();
}
+ public int getThreadPoolQueueSize() {
+ return this.threadPoolQueueSize.get();
+ }
+
protected DMaaPMessageConsumer createConsumerTask() {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}