In certain conditions there is no reason to retry fetching files.
For instance when the file is removed in the PNF or when
the password/certificate is wrong.
When the DFC is started there are sometimes
queued VES events that referes to removed files which in turn results
in that the DFC will retry fetching these files in vain.
The DFC house keeps its number of concurrents tasks to not exeed quotas
for memory,open file descriptors etc.
As more threads are occupied with retrying, the fewer threads can do
their intended work, which decreases the throughput.
Testing has showed that already when the number of PNFs are 10, the throughput
is radically decreased (and the problem is then escalating).
Change-Id: I9fd57b6b5209bae1ffb4191f5274b591c346b79a
Issue-ID: DCAEGEN2-1508
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
default: "dradmin"
tag_version:
type: string
- default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.datafile.datafile-app-server:1.1.2"
+ default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.datafile.datafile-app-server:1.1.3"
replicas:
type: integer
description: number of instances
<parent>
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.1.2-SNAPSHOT</version>
+ <version>1.1.3-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
public class DatafileTaskException extends Exception {
private static final long serialVersionUID = 1L;
+ private final boolean isRetryable;
public DatafileTaskException(String message) {
super(message);
+ isRetryable = true;
+ }
+
+ public DatafileTaskException(String message, boolean retry) {
+ super(message);
+ isRetryable = retry;
}
public DatafileTaskException(String message, Exception originalException) {
super(message, originalException);
+ isRetryable = true;
+ }
+
+ public DatafileTaskException(String message, Exception originalException, boolean retry) {
+ super(message, originalException);
+ isRetryable = retry;
+ }
+
+ public boolean isRetryable() {
+ return this.isRetryable;
}
}
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Optional;
+
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
try (OutputStream output = createOutputStream(localFileName)) {
logger.trace("begin to retrieve from xNF.");
if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
- throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
+ final boolean retry = false; // Skip retrying for all problems except IOException
+ throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry);
}
} catch (IOException e) {
throw new DatafileTaskException("Could not fetch file: " + e, e);
protected OutputStream createOutputStream(Path localFileName) throws IOException {
File localFile = localFileName.toFile();
- if (localFile.createNewFile()) {
+ if (!localFile.createNewFile()) {
logger.warn("Local file {} already created", localFileName);
}
OutputStream output = new FileOutputStream(localFile);
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
+import com.jcraft.jsch.SftpException;
+
import java.nio.file.Path;
import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
try {
sftpChannel.get(remoteFile, localFile.toString());
logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
- } catch (Exception e) {
- throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+ } catch (SftpException e) {
+ boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
}
logger.trace("collectFile OK");
sftpChannel = getChannel(session);
}
} catch (JSchException e) {
- throw new DatafileTaskException("Could not open Sftp client" + e, e);
+ boolean retry = !e.getMessage().contains("Auth fail");
+ throw new DatafileTaskException("Could not open Sftp client" + e, e, retry);
}
}
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+import java.util.Optional;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
return Mono.just(fileData) //
.cache() //
.flatMap(fd -> collectFile(fileData, contextMap)) //
- .retryBackoff(numRetries, firstBackoff);
+ .retryBackoff(numRetries, firstBackoff) //
+ .flatMap(this::checkCollectedFile);
}
- private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) {
+ private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ if (info.isPresent()) {
+ return Mono.just(info.get());
+ } else {
+ // If there is no info, the file is not retrievable
+ return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
+ }
+ }
+
+ private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getFilePublishInformation(fileData, localFile, context));
- } catch (Exception throwable) {
+ return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
+ } catch (DatafileTaskException e) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+ e.toString());
+ if (e.isRetryable()) {
+ return Mono.error(e);
+ } else {
+ return Mono.just(Optional.empty()); // Give up
+ }
+ } catch (Exception throwable) {
+ logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
throwable.toString());
- return Mono.error(throwable);
+ return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+ Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
return ImmutableFilePublishInformation.builder() //
private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
MDC.setContextMap(fileData.context);
- return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES,
- FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context)
+ return createFileCollector() //
+ .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT,
+ fileData.context) //
.onErrorResume(exception -> handleFetchFileFailure(fileData));
}
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
+ @Test
+ public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
+
+ final boolean retry = false;
+ FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
+ doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
+ .expectErrorMessage("Non retryable file transfer failure") //
+ .verify();
+
+ verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ }
+
@Test
public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.1.2-SNAPSHOT</version>
+ <version>1.1.3-SNAPSHOT</version>
<name>dcaegen2-collectors.datafile</name>
<description>datafile collector</description>
major=1\r
minor=1\r
-patch=2\r
+patch=3\r
base_version=${major}.${minor}.${patch}\r
release_version=${base_version}\r
snapshot_version=${base_version}-SNAPSHOT\r