Bugfixes, Generalizing Data File Collection to handle any type of file 26/89126/1
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 3 Jun 2019 08:38:10 +0000 (08:38 +0000)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 3 Jun 2019 08:38:10 +0000 (08:38 +0000)
- When a change ID was not configured, the task counter was not decreased.
The result was that the DFC stopped polling,

- When the check if a file is a already published fails
(most likely to a problem is the DR), the DFC will try
to publish it (instead of just ingnoring it).

Change-Id: If9f5b962210f809d5d2ae0aa60d3a7f99099c058
Issue-ID: DCAEGEN2-1532
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java

index bac5265..0f220fd 100644 (file)
@@ -117,10 +117,10 @@ public class ScheduledTasks {
                 .runOn(scheduler) //
                 .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
                 .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
-                .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
                 .flatMap(fileData -> createMdcContext(fileData, context)) //
                 .filter(this::isFeedConfigured) //
                 .filter(this::shouldBePublished) //
+                .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
                 .flatMap(this::fetchFile, false, 1, 1) //
                 .flatMap(this::publishToDataRouter, false, 1, 1) //
                 .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
@@ -210,21 +210,19 @@ public class ScheduledTasks {
     }
 
     private boolean shouldBePublished(FileDataWithContext fileData) {
-        boolean result = false;
         Path localFilePath = fileData.fileData.getLocalFilePath();
         if (publishedFilesCache.put(localFilePath) == null) {
             try {
-                result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+                boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
                         fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+                return result;
             } catch (DatafileTaskException e) {
                 logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+                return true; // Publish it then
             }
+        } else {
+            return false;
         }
-        if (!result) {
-            currentNumberOfTasks.decrementAndGet();
-        }
-
-        return result;
     }
 
     private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
@@ -269,7 +267,7 @@ public class ScheduledTasks {
     private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
         logger.info(
                 "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
-                + "number of subscriptions: {}",
+                        + "number of subscriptions: {}",
                 getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
 
         Map<String, String> context = MDC.getCopyOfContextMap();