Fix, skip FTP retry in certain cases 55/87955/4 1.1.3 4.0.0-ONAP
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 20 May 2019 09:19:31 +0000 (09:19 +0000)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 20 May 2019 09:19:31 +0000 (09:19 +0000)
In certain conditions there is no reason to retry fetching files.
For instance when the file is removed in the PNF or when
the password/certificate is wrong.

When the DFC is started there are sometimes
queued VES events that referes to removed files which in turn results
in that the DFC will retry fetching these files in vain.
The DFC house keeps its number of concurrents tasks to not exeed quotas
for memory,open file descriptors etc.
As more threads are occupied with retrying, the fewer threads can do
their intended work, which decreases the throughput.

Testing has showed that already when the number of PNFs are 10, the throughput
is radically decreased (and the problem is then escalating).

Change-Id: I9fd57b6b5209bae1ffb4191f5274b591c346b79a
Issue-ID: DCAEGEN2-1508
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
datafile-app-server/dpo/blueprints/k8s-datafile.yaml
datafile-app-server/pom.xml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
pom.xml
version.properties

index 9ce0559..534bc1f 100644 (file)
@@ -70,7 +70,7 @@ inputs:
     default: "dradmin"
   tag_version:
     type: string
-    default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.datafile.datafile-app-server:1.1.2"
+    default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.datafile.datafile-app-server:1.1.3"
   replicas:
     type: integer
     description: number of instances
index c0c1da8..4da6d2d 100644 (file)
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.1.2-SNAPSHOT</version>
+    <version>1.1.3-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
index 4230800..38f74ed 100644 (file)
@@ -21,12 +21,29 @@ package org.onap.dcaegen2.collectors.datafile.exceptions;
 public class DatafileTaskException extends Exception {
 
     private static final long serialVersionUID = 1L;
+    private final boolean isRetryable;
 
     public DatafileTaskException(String message) {
         super(message);
+        isRetryable = true;
+    }
+
+    public DatafileTaskException(String message, boolean retry) {
+        super(message);
+        isRetryable = retry;
     }
 
     public DatafileTaskException(String message, Exception originalException) {
         super(message, originalException);
+        isRetryable = true;
+    }
+
+    public DatafileTaskException(String message, Exception originalException, boolean retry) {
+        super(message, originalException);
+        isRetryable = retry;
+    }
+
+    public boolean isRetryable() {
+        return this.isRetryable;
     }
 }
index b8488f3..c78ae3a 100644 (file)
@@ -28,9 +28,11 @@ import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.CertificateException;
 import java.util.Optional;
+
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
+
 import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPReply;
 import org.apache.commons.net.ftp.FTPSClient;
@@ -117,7 +119,8 @@ public class FtpsClient implements FileCollectClient {
         try (OutputStream output = createOutputStream(localFileName)) {
             logger.trace("begin to retrieve from xNF.");
             if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
-                throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
+                final boolean retry = false; // Skip retrying for all problems except IOException
+                throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry);
             }
         } catch (IOException e) {
             throw new DatafileTaskException("Could not fetch file: " + e, e);
@@ -173,7 +176,7 @@ public class FtpsClient implements FileCollectClient {
 
     protected OutputStream createOutputStream(Path localFileName) throws IOException {
         File localFile = localFileName.toFile();
-        if (localFile.createNewFile()) {
+        if (!localFile.createNewFile()) {
             logger.warn("Local file {} already created", localFileName);
         }
         OutputStream output = new FileOutputStream(localFile);
index 4006859..333be92 100644 (file)
@@ -21,8 +21,11 @@ import com.jcraft.jsch.ChannelSftp;
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
+import com.jcraft.jsch.SftpException;
+
 import java.nio.file.Path;
 import java.util.Optional;
+
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +56,9 @@ public class SftpClient implements FileCollectClient {
         try {
             sftpChannel.get(remoteFile, localFile.toString());
             logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
-        } catch (Exception e) {
-            throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+        } catch (SftpException e) {
+            boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE &&  e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
+            throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
         }
 
         logger.trace("collectFile OK");
@@ -81,7 +85,8 @@ public class SftpClient implements FileCollectClient {
                 sftpChannel = getChannel(session);
             }
         } catch (JSchException e) {
-            throw new DatafileTaskException("Could not open Sftp client" + e, e);
+            boolean retry = !e.getMessage().contains("Auth fail");
+            throw new DatafileTaskException("Could not open Sftp client" + e, e, retry);
         }
     }
 
index 96237e4..0a6b669 100644 (file)
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Optional;
+
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
index 6f3f6b7..0c62795 100644 (file)
@@ -20,6 +20,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Map;
+import java.util.Optional;
 
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
@@ -74,10 +75,20 @@ public class FileCollector {
         return Mono.just(fileData) //
                 .cache() //
                 .flatMap(fd -> collectFile(fileData, contextMap)) //
-                .retryBackoff(numRetries, firstBackoff);
+                .retryBackoff(numRetries, firstBackoff) //
+                .flatMap(this::checkCollectedFile);
     }
 
-    private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) {
+    private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+        if (info.isPresent()) {
+            return Mono.just(info.get());
+        } else {
+            // If there is no info, the file is not retrievable
+            return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
+        }
+    }
+
+    private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
         MDC.setContextMap(context);
         logger.trace("starting to collectFile {}", fileData.name());
 
@@ -88,11 +99,19 @@ public class FileCollector {
             currentClient.open();
             localFile.getParent().toFile().mkdir(); // Create parent directories
             currentClient.collectFile(remoteFile, localFile);
-            return Mono.just(getFilePublishInformation(fileData, localFile, context));
-        } catch (Exception throwable) {
+            return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
+        } catch (DatafileTaskException e) {
             logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+                    e.toString());
+            if (e.isRetryable()) {
+                return Mono.error(e);
+            } else {
+                return Mono.just(Optional.empty()); // Give up
+            }
+        } catch (Exception throwable) {
+            logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
                     throwable.toString());
-            return Mono.error(throwable);
+            return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
         }
     }
 
@@ -107,7 +126,8 @@ public class FileCollector {
         }
     }
 
-    private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) {
+    private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+            Map<String, String> context) {
         String location = fileData.location();
         MessageMetaData metaData = fileData.messageMetaData();
         return ImmutableFilePublishInformation.builder() //
index 5cc894c..26d47b5 100644 (file)
@@ -202,8 +202,9 @@ public class ScheduledTasks {
 
     private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
         MDC.setContextMap(fileData.context);
-        return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES,
-                FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context)
+        return createFileCollector() //
+                .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT,
+                        fileData.context) //
                 .onErrorResume(exception -> handleFetchFileFailure(fileData));
     }
 
index 1a9d669..cad3486 100644 (file)
@@ -213,6 +213,23 @@ public class FileCollectorTest {
         verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
     }
 
+    @Test
+    public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception {
+        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+        doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
+
+        final boolean retry = false;
+        FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
+        doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock)
+                .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+        StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
+                .expectErrorMessage("Non retryable file transfer failure") //
+                .verify();
+
+        verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+    }
+
     @Test
     public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
         FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
diff --git a/pom.xml b/pom.xml
index 20b7b6e..9caf5ad 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
   <groupId>org.onap.dcaegen2.collectors</groupId>
   <artifactId>datafile</artifactId>
-  <version>1.1.2-SNAPSHOT</version>
+  <version>1.1.3-SNAPSHOT</version>
 
   <name>dcaegen2-collectors.datafile</name>
   <description>datafile collector</description>
index 952a72c..bcf3556 100644 (file)
@@ -1,6 +1,6 @@
 major=1\r
 minor=1\r
-patch=2\r
+patch=3\r
 base_version=${major}.${minor}.${patch}\r
 release_version=${base_version}\r
 snapshot_version=${base_version}-SNAPSHOT\r