Bugfix, improved behaviour for large files 44/89244/5
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 14 Jun 2019 06:38:21 +0000 (06:38 +0000)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 14 Jun 2019 06:38:21 +0000 (06:38 +0000)
Previously files was read into a buffer for publishing.
This does not work when files are bigger than the available memory.
After the fix , files are streamed instead.

Implemented a new REST primitive for exposing status and
statistics. To be used for test and trouble shooting.

Change-Id: Iab5a1ee9ffcbf6836fcf709d115bf25ab0391732
Issue-ID: DCAEGEN2-1532
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
14 files changed:
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java [moved from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java with 65% similarity]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java [moved from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java with 72% similarity]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java

@@ -19,6 +19,8 @@ package org.onap.dcaegen2.collectors.datafile.controllers;
 import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY;
 import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT;
 
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -37,28 +39,25 @@ import org.springframework.web.bind.annotation.RestController;
 import reactor.core.publisher.Mono;
 
 /**
- * Controller to check the heartbeat of DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * REST Controller to check the heart beat and status of the DFC.
  */
 @RestController
-@Api(value = "HeartbeatController")
-public class HeartbeatController {
+@Api(value = "StatusController")
+public class StatusController {
 
-    private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
+    private static final Logger logger = LoggerFactory.getLogger(StatusController.class);
 
     private final ScheduledTasks scheduledTasks;
 
     @Autowired
-    public HeartbeatController(ScheduledTasks scheduledTasks) {
+    public StatusController(ScheduledTasks scheduledTasks) {
         this.scheduledTasks = scheduledTasks;
     }
 
     /**
-     * Checks the heartbeat of DFC.
+     * Checks the heart beat of DFC.
      *
-     * @return the heartbeat status of DFC.
+     * @return the heart beat status of DFC.
      */
     @GetMapping("/heartbeat")
     @ApiOperation(value = "Returns liveness of DATAFILE service")
@@ -66,18 +65,41 @@ public class HeartbeatController {
             @ApiResponse(code = 200, message = "DATAFILE service is living"),
             @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
             @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
-            @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") })
+            @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
     public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) {
         MappedDiagnosticContext.initializeTraceContext(headers);
         logger.info(ENTRY, "Heartbeat request");
 
-        StringBuilder statusString = new StringBuilder("I'm living! Status: ");
-        statusString.append("numberOfFileCollectionTasks=").append(scheduledTasks.getCurrentNumberOfTasks())
-                .append(",");
-        statusString.append("fileCacheSize=").append(scheduledTasks.publishedFilesCacheSize());
+        String statusString = "I'm living!";
 
-        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString.toString(), HttpStatus.OK));
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString, HttpStatus.OK));
         logger.info(EXIT, "Heartbeat request");
         return response;
     }
+
+    /**
+     * Returns diagnostics and statistics information. It is intended for testing and trouble
+     * shooting.
+     *
+     * @return information.
+     */
+    @GetMapping("/status")
+    @ApiOperation(value = "Returns status and statistics of DATAFILE service")
+    @ApiResponses(value = { //
+            @ApiResponse(code = 200, message = "DATAFILE service is living"),
+            @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
+            @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
+            @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+    public Mono<ResponseEntity<String>> status(@RequestHeader HttpHeaders headers) {
+        MappedDiagnosticContext.initializeTraceContext(headers);
+        logger.info(ENTRY, "Status request");
+
+        Counters counters = scheduledTasks.getCounters();
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(counters.toString(), HttpStatus.OK));
+        logger.info(EXIT, "Status request");
+        return response;
+    }
+
+
+
 }
index 38f74ed..6aa7615 100644 (file)
@@ -21,29 +21,13 @@ package org.onap.dcaegen2.collectors.datafile.exceptions;
 public class DatafileTaskException extends Exception {
 
     private static final long serialVersionUID = 1L;
-    private final boolean isRetryable;
 
     public DatafileTaskException(String message) {
         super(message);
-        isRetryable = true;
-    }
-
-    public DatafileTaskException(String message, boolean retry) {
-        super(message);
-        isRetryable = retry;
     }
 
     public DatafileTaskException(String message, Exception originalException) {
         super(message, originalException);
-        isRetryable = true;
     }
 
-    public DatafileTaskException(String message, Exception originalException, boolean retry) {
-        super(message, originalException);
-        isRetryable = retry;
-    }
-
-    public boolean isRetryable() {
-        return this.isRetryable;
-    }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java
new file mode 100644 (file)
index 0000000..a4bdd66
--- /dev/null
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+
+public class NonRetryableDatafileTaskException extends DatafileTaskException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NonRetryableDatafileTaskException(String message) {
+        super(message);
+    }
+
+    public NonRetryableDatafileTaskException(String message, Exception originalException) {
+        super(message, originalException);
+    }
+
+}
index bb3016c..1828860 100644 (file)
@@ -38,6 +38,7 @@ import org.apache.commons.net.ftp.FTPReply;
 import org.apache.commons.net.ftp.FTPSClient;
 import org.apache.commons.net.util.KeyManagerUtils;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.io.FileSystemResource;
@@ -119,8 +120,7 @@ public class FtpsClient implements FileCollectClient {
         try (OutputStream output = createOutputStream(localFileName)) {
             logger.trace("begin to retrieve from xNF.");
             if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
-                final boolean retry = false; // Skip retrying for all problems except IOException
-                throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry);
+                throw new NonRetryableDatafileTaskException("Could not retrieve file. No retry attempts will be done, file :" + remoteFileName);
             }
         } catch (IOException e) {
             throw new DatafileTaskException("Could not fetch file: " + e, e);
index ec52335..f5d10d3 100644 (file)
@@ -25,6 +25,7 @@ import com.jcraft.jsch.SftpException;
 import java.nio.file.Path;
 import java.util.Optional;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +58,12 @@ public class SftpClient implements FileCollectClient {
         } catch (SftpException e) {
             boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED
                     && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
-            throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
+            if (retry) {
+                throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+            } else {
+                throw new NonRetryableDatafileTaskException(
+                        "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e);
+            }
         }
 
         logger.trace("collectFile OK");
@@ -85,7 +91,12 @@ public class SftpClient implements FileCollectClient {
             }
         } catch (JSchException e) {
             boolean retry = !e.getMessage().contains("Auth fail");
-            throw new DatafileTaskException("Could not open Sftp client. " + e, e, retry);
+            if (retry) {
+                throw new DatafileTaskException("Could not open Sftp client. " + e);
+            } else {
+                throw new NonRetryableDatafileTaskException(
+                        "Could not open Sftp client, no retry attempts will be done " + e);
+            }
         }
     }
 
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
new file mode 100644 (file)
index 0000000..5efbe37
--- /dev/null
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ *
+ * Various counters that can be shown via a REST API.
+ *
+ */
+public class Counters {
+
+    private final AtomicInteger numberOfTasks = new AtomicInteger();
+    private final AtomicInteger numberOfSubscriptions = new AtomicInteger();
+    private int noOfCollectedFiles = 0;
+    private int noOfFailedFtpAttempts = 0;
+    private int noOfFailedFtp = 0;
+    private int noOfFailedPublishAttempts = 0;
+    private int totalPublishedFiles = 0;
+    private int noOfFailedPublish = 0;
+    private Instant lastPublishedTime = Instant.MIN;
+    private int totalReceivedEvents = 0;
+    private Instant lastEventTime = Instant.MIN;
+
+    public AtomicInteger getCurrentNumberOfTasks() {
+        return numberOfTasks;
+    }
+
+    public AtomicInteger getCurrentNumberOfSubscriptions() {
+        return numberOfSubscriptions;
+    }
+
+    public synchronized void incNoOfReceivedEvents() {
+        totalReceivedEvents++;
+        lastEventTime = Instant.now();
+    }
+
+    public synchronized void incNoOfCollectedFiles() {
+        noOfCollectedFiles++;
+    }
+
+    public synchronized void incNoOfFailedFtpAttempts() {
+        noOfFailedFtpAttempts++;
+    }
+
+    public synchronized void incNoOfFailedFtp() {
+        noOfFailedFtp++;
+    }
+
+    public synchronized void incNoOfFailedPublishAttempts() {
+        noOfFailedPublishAttempts++;
+    }
+
+    public synchronized void incTotalPublishedFiles() {
+        totalPublishedFiles++;
+        lastPublishedTime = Instant.now();
+    }
+
+    public synchronized void incNoOfFailedPublish() {
+        noOfFailedPublish++;
+    }
+
+    public synchronized String toString() {
+        StringBuilder str = new StringBuilder();
+        str.append(format("totalReceivedEvents", totalReceivedEvents));
+        str.append(format("lastEventTime", lastEventTime));
+        str.append(format("numberOfTasks", numberOfTasks));
+        str.append(format("numberOfSubscriptions", numberOfSubscriptions));
+        str.append("\n");
+        str.append(format("collectedFiles", noOfCollectedFiles));
+        str.append(format("failedFtpAttempts", noOfFailedFtpAttempts));
+        str.append(format("failedFtp", noOfFailedFtp));
+        str.append("\n");
+        str.append(format("totalPublishedFiles", totalPublishedFiles));
+        str.append(format("lastPublishedTime", lastPublishedTime));
+
+        str.append(format("failedPublishAttempts", noOfFailedPublishAttempts));
+        str.append(format("noOfFailedPublish", noOfFailedPublish));
+
+        return str.toString();
+    }
+
+    private String format(String name, Object value) {
+        String header = name + ":";
+        return String.format("%-24s%-22s\n", header, value);
+    }
+}
index d9efe80..02e153c 100644 (file)
@@ -22,19 +22,20 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.nio.file.Path;
 import java.time.Duration;
-import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.FileEntity;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -62,9 +63,11 @@ public class DataRouterPublisher {
 
     private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
     private final AppConfig datafileAppConfig;
+    private final Counters counters;
 
-    public DataRouterPublisher(AppConfig datafileAppConfig) {
+    public DataRouterPublisher(AppConfig datafileAppConfig, Counters counters) {
         this.datafileAppConfig = datafileAppConfig;
+        this.counters = counters;
     }
 
     /**
@@ -98,8 +101,10 @@ public class DataRouterPublisher {
             HttpResponse response =
                     dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
             logger.trace("{}", response);
+            counters.incTotalPublishedFiles();
             return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
         } catch (Exception e) {
+            counters.incNoOfFailedPublishAttempts();
             logger.warn("Publishing file {} to DR unsuccessful.", publishInfo.getName(), e);
             return Mono.error(e);
         }
@@ -121,10 +126,9 @@ public class DataRouterPublisher {
     }
 
     private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
-        Path fileLocation = publishInfo.getInternalLocation();
-        try (InputStream fileInputStream = createInputStream(fileLocation)) {
-            put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
-        }
+        File file = createInputFile(publishInfo.getInternalLocation());
+        FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY);
+        put.setEntity(entity);
     }
 
     private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response,
@@ -140,9 +144,9 @@ public class DataRouterPublisher {
         }
     }
 
-    InputStream createInputStream(Path filePath) throws IOException {
+    File createInputFile(Path filePath) throws IOException {
         FileSystemResource realResource = new FileSystemResource(filePath);
-        return realResource.getInputStream();
+        return realResource.getFile();
     }
 
     PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
index aeacaff..311f752 100644 (file)
@@ -24,9 +24,11 @@ import java.util.Optional;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
@@ -45,14 +47,16 @@ public class FileCollector {
 
     private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
     private final AppConfig datafileAppConfig;
+    private final Counters counters;
 
     /**
      * Constructor.
      *
      * @param datafileAppConfig application configuration
      */
-    public FileCollector(AppConfig datafileAppConfig) {
+    public FileCollector(AppConfig datafileAppConfig, Counters counters) {
         this.datafileAppConfig = datafileAppConfig;
+        this.counters = counters;
     }
 
     /**
@@ -97,14 +101,16 @@ public class FileCollector {
             currentClient.open();
             localFile.getParent().toFile().mkdir(); // Create parent directories
             currentClient.collectFile(remoteFile, localFile);
+            counters.incNoOfCollectedFiles();
             return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
         } catch (DatafileTaskException e) {
             logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
                     e.toString());
-            if (e.isRetryable()) {
-                return Mono.error(e);
-            } else {
+            counters.incNoOfFailedFtpAttempts();
+            if (e instanceof NonRetryableDatafileTaskException) {
                 return Mono.just(Optional.empty()); // Give up
+            } else {
+                return Mono.error(e);
             }
         } catch (Exception throwable) {
             logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
index 99b2d91..300ca60 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers;
 
 
 /**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
  */
 @Component
 public class ScheduledTasks {
@@ -57,11 +58,12 @@ public class ScheduledTasks {
     private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
 
     private final AppConfig applicationConfiguration;
-    private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+    private final AtomicInteger currentNumberOfTasks;
     private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
-    private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
+    private final AtomicInteger currentNumberOfSubscriptions;
     private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
     PublishedFileCache publishedFilesCache = new PublishedFileCache();
+    private Counters counters = new Counters();
 
     /**
      * Constructor for task registration in Datafile Workflow.
@@ -71,6 +73,8 @@ public class ScheduledTasks {
     @Autowired
     public ScheduledTasks(AppConfig applicationConfiguration) {
         this.applicationConfiguration = applicationConfiguration;
+        this.currentNumberOfTasks = counters.getCurrentNumberOfTasks();
+        this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions();
     }
 
     /**
@@ -112,6 +116,7 @@ public class ScheduledTasks {
     Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
         return fetchMoreFileReadyMessages() //
                 .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
+                .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
                 .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
                 .runOn(scheduler) //
                 .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
@@ -148,32 +153,21 @@ public class ScheduledTasks {
         return new PublishedChecker(applicationConfiguration);
     }
 
-    public int getCurrentNumberOfTasks() {
-        return currentNumberOfTasks.get();
+    public Counters getCounters() {
+        return this.counters;
     }
 
-    public int publishedFilesCacheSize() {
-        return publishedFilesCache.size();
-    }
-
-    public int getCurrentNumberOfSubscriptions() {
-        return currentNumberOfSubscriptions.get();
-    }
-
-    public int getThreadPoolQueueSize() {
-        return this.threadPoolQueueSize.get();
-    }
 
     protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
         return new DMaaPMessageConsumer(this.applicationConfiguration);
     }
 
     protected FileCollector createFileCollector() {
-        return new FileCollector(applicationConfiguration);
+        return new FileCollector(applicationConfiguration, counters);
     }
 
     protected DataRouterPublisher createDataRouterPublisher() {
-        return new DataRouterPublisher(applicationConfiguration);
+        return new DataRouterPublisher(applicationConfiguration, counters);
     }
 
     private static void onComplete(Map<String, String> contextMap) {
@@ -181,6 +175,22 @@ public class ScheduledTasks {
         logger.trace("Datafile tasks have been completed");
     }
 
+    int publishedFilesCacheSize() {
+        return publishedFilesCache.size();
+    }
+
+    int getCurrentNumberOfTasks() {
+        return this.currentNumberOfTasks.get();
+    }
+
+    int getCurrentNumberOfSubscriptions() {
+        return this.currentNumberOfSubscriptions.get();
+    }
+
+    int getThreadPoolQueueSize() {
+        return this.threadPoolQueueSize.get();
+    }
+
     private static synchronized void onSuccess(FilePublishInformation publishInfo) {
         MDC.setContextMap(publishInfo.getContext());
         logger.info("Datafile file published {}", publishInfo.getInternalLocation());
@@ -239,6 +249,7 @@ public class ScheduledTasks {
         deleteFile(localFilePath, fileData.context);
         publishedFilesCache.remove(localFilePath);
         currentNumberOfTasks.decrementAndGet();
+        counters.incNoOfFailedFtp();
         return Mono.empty();
     }
 
@@ -257,6 +268,7 @@ public class ScheduledTasks {
         deleteFile(internalFileName, publishInfo.getContext());
         publishedFilesCache.remove(internalFileName);
         currentNumberOfTasks.decrementAndGet();
+        counters.incNoOfFailedPublish();
         return Mono.empty();
     }
 
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -31,7 +32,8 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.controllers.HeartbeatController;
+import org.onap.dcaegen2.collectors.datafile.controllers.StatusController;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
@@ -40,31 +42,48 @@ import org.springframework.http.HttpHeaders;
 import org.springframework.http.ResponseEntity;
 import reactor.core.publisher.Mono;
 
-public class HeartbeatControllerTest {
+public class StatusControllerTest {
     @Test
     public void heartbeat_success() {
         ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
-        when(scheduledTasksMock.getCurrentNumberOfTasks()).thenReturn(10);
-        when(scheduledTasksMock.publishedFilesCacheSize()).thenReturn(20);
 
         HttpHeaders httpHeaders = new HttpHeaders();
 
-        HeartbeatController controllerUnderTest = new HeartbeatController(scheduledTasksMock);
+        StatusController controllerUnderTest = new StatusController(scheduledTasksMock);
 
-        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(HeartbeatController.class);
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(StatusController.class);
         Mono<ResponseEntity<String>> result = controllerUnderTest.heartbeat(httpHeaders);
 
         validateLogging(logAppender);
 
         String body = result.block().getBody();
-        assertTrue(body.startsWith("I'm living! Status: "));
-        assertTrue(body.contains("numberOfFileCollectionTasks=10"));
-        assertTrue(body.contains("fileCacheSize=20"));
+        assertTrue(body.startsWith("I'm living!"));
 
         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID)));
     }
 
+
+    @Test
+    public void status() {
+        ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
+        Counters counters = new Counters();
+        doReturn(counters).when(scheduledTasksMock).getCounters();
+
+        HttpHeaders httpHeaders = new HttpHeaders();
+
+        StatusController controllerUnderTest = new StatusController(scheduledTasksMock);
+
+        Mono<ResponseEntity<String>> result = controllerUnderTest.status(httpHeaders);
+
+        String body = result.block().getBody();
+        System.out.println(body);
+
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID)));
+    }
+
+
     private void validateLogging(ListAppender<ILoggingEvent> logAppender) {
         assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
         assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
index e018256..f4e814f 100644 (file)
@@ -205,7 +205,8 @@ public class FtpsClientTest {
         doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
 
         assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
-            .hasMessage("Could not retrieve file /dir/sample.txt");
+            .hasMessageContaining(REMOTE_FILE_PATH)
+            .hasMessageContaining("No retry");
 
         verifyFtpsClientMock_openOk();
         verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
index cb3735b..693806c 100644 (file)
@@ -121,8 +121,7 @@ public class SftpClientTest {
         doReturn(jschMock).when(sftpClientSpy).createJsch();
         when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Failed"));
 
-        assertThatThrownBy(() -> sftpClientSpy.open())
-                .hasMessageStartingWith("Could not open Sftp client. com.jcraft.jsch.JSchException: Failed");
+        assertThatThrownBy(() -> sftpClientSpy.open()).hasMessageStartingWith("Could not open Sftp client.");
     }
 
     @SuppressWarnings("resource")
@@ -161,8 +160,9 @@ public class SftpClientTest {
 
             assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile")))
                     .isInstanceOf(DatafileTaskException.class)
-                    .hasMessageStartingWith("Unable to get file from xNF. Data: FileServerData{serverAddress=" + HOST
-                            + ", " + "userId=" + USERNAME + ", password=####, port=" + SFTP_PORT);
+                    .hasMessageStartingWith("Unable to get file from xNF. No retry attempts will be done")
+                    .hasMessageContaining("Data: FileServerData{serverAddress=" + HOST + ", " + "userId=" + USERNAME
+                            + ", password=####, port=" + SFTP_PORT);
         }
     }
 
index 4da22cb..6a9dccd 100644 (file)
@@ -27,10 +27,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
+
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
@@ -86,7 +87,6 @@ class DataRouterPublisherTest {
     private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
     private static final String PUBLISH_TOPIC = "publish";
     private static final String FEED_ID = "1";
-    private static final String FILE_CONTENT = "Just a string.";
 
     private static FilePublishInformation filePublishInformation;
     private static DmaapProducerHttpClient httpClientMock;
@@ -120,7 +120,7 @@ class DataRouterPublisherTest {
                 .changeIdentifier(CHANGE_IDENTIFIER) //
                 .build(); //
         appConfig = mock(AppConfig.class);
-        publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
+        publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, new Counters()));
     }
 
     @Test
@@ -236,8 +236,8 @@ class DataRouterPublisherTest {
         when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
         when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
 
-        InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
-        doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", PM_FILE_NAME));
+        File file = File.createTempFile("DFC", "tmp");
+        doReturn(file).when(publisherTaskUnderTestSpy).createInputFile(Paths.get("target", PM_FILE_NAME));
     }
 
     private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
index 299a023..99e92bd 100644 (file)
@@ -37,9 +37,11 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -93,6 +95,7 @@ public class FileCollectorTest {
     private SftpClient sftpClientMock = mock(SftpClient.class);
     private final Map<String, String> contextMap = new HashMap<>();
 
+    private final Counters counters = new Counters();
 
     private MessageMetaData createMessageMetaData() {
         return ImmutableMessageMetaData.builder() //
@@ -133,7 +136,7 @@ public class FileCollectorTest {
                 .compression(GZIP_COMPRESSION) //
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
                 .fileFormatVersion(FILE_FORMAT_VERSION) //
-                .context(new HashMap<String,String>()) //
+                .context(new HashMap<String, String>()) //
                 .changeIdentifier(CHANGE_IDENTIFIER) //
                 .build();
     }
@@ -152,7 +155,7 @@ public class FileCollectorTest {
 
     @Test
     public void whenFtpesFile_returnCorrectResponse() throws Exception {
-        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
         doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
 
         FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
@@ -173,7 +176,7 @@ public class FileCollectorTest {
 
     @Test
     public void whenSftpFile_returnCorrectResponse() throws Exception {
-        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
         doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
 
 
@@ -201,7 +204,7 @@ public class FileCollectorTest {
 
     @Test
     public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
-        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
         doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
 
         FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
@@ -217,12 +220,11 @@ public class FileCollectorTest {
 
     @Test
     public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception {
-        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, new Counters()));
         doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
 
-        final boolean retry = false;
         FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
-        doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock)
+        doThrow(new NonRetryableDatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
                 .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
@@ -234,7 +236,7 @@ public class FileCollectorTest {
 
     @Test
     public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
-        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
         doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
         doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
                 .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);