Fixed some sonar issues 43/78743/5
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 22 Feb 2019 13:51:34 +0000 (13:51 +0000)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 22 Feb 2019 13:51:34 +0000 (13:51 +0000)
Changed some Flux to Mono.

Removed some obfuscating wrappers.

Issue-ID: DCAEGEN2-1118
Change-Id: I76dcaea7c69608cf404389fad93f7539f735aad2
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
17 files changed:
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java [deleted file]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java [deleted file]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java [deleted file]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java [deleted file]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java [deleted file]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java [deleted file]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java

index 3c606de..a8f79ea 100644 (file)
@@ -123,12 +123,11 @@ public class JsonMessageParser {
     }
 
     private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
-        return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
-                ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
-                : transformMessages(monoJsonP));
+        return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
+                : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
     }
 
-    private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+    private Mono<FileReadyMessage> transformMessages(JsonObject message) {
         Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
         if (optionalMessageMetaData.isPresent()) {
             JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
@@ -138,22 +137,22 @@ public class JsonMessageParser {
                 if (!allFileDataFromJson.isEmpty()) {
                     MessageMetaData messageMetaData = optionalMessageMetaData.get();
                     // @formatter:off
-                    return Flux.just(ImmutableFileReadyMessage.builder()
+                    return Mono.just(ImmutableFileReadyMessage.builder()
                             .pnfName(messageMetaData.sourceName())
                             .messageMetaData(messageMetaData)
                             .files(allFileDataFromJson)
                             .build());
                     // @formatter:on
                 } else {
-                    return Flux.empty();
+                    return Mono.empty();
                 }
             }
 
             logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
-            return Flux.empty();
+            return Mono.empty();
         }
         logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
-        return Flux.empty();
+        return Mono.empty();
     }
 
     private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
index 338c832..4c0dcce 100644 (file)
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
 
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -50,27 +50,27 @@ public class DataRouterPublisher {
      * @param firstBackoffTimeout the time to delay the first retry
      * @return the HTTP response status as a string
      */
-    public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+    public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
         logger.trace("Method called with arg {}", model);
         DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
 
         //@formatter:off
-        return Flux.just(model)
-                .cache(1)
+        return Mono.just(model)
+                .cache()
                 .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
                 .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
                 .retryBackoff(numRetries, firstBackoff);
         //@formatter:on
     }
 
-    private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+    private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
 
         if (HttpUtils.isSuccessfulResponseCode(response.value())) {
             logger.trace("Publish to DR successful!");
-            return Flux.just(model);
+            return Mono.just(model);
         } else {
-            logger.warn("Publish to DR unsuccessful, response code: " + response);
-            return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+            logger.warn("Publish to DR unsuccessful, response code: {}", response);
+            return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
         }
     }
 
index f22c7bf..37b7a55 100644 (file)
@@ -98,7 +98,7 @@ public class ScheduledTasks {
             .doOnNext(fileData -> taskCounter.incrementAndGet())
             .flatMap(this::collectFileFromXnf)
             .flatMap(this::publishToDataRouter)
-            .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+            .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
             .doOnNext(model -> taskCounter.decrementAndGet())
             .sequential()
             .subscribe(this::onSuccess, this::onError, this::onComplete);
@@ -109,8 +109,8 @@ public class ScheduledTasks {
         logger.info("Datafile tasks have been completed");
     }
 
-    private void onSuccess(Path localFile) {
-        logger.info("Datafile consumed tasks." + localFile);
+    private void onSuccess(ConsumerDmaapModel model) {
+        logger.info("Datafile consumed tasks {}", model.getInternalLocation());
     }
 
     private void onError(Throwable throwable) {
@@ -138,18 +138,19 @@ public class ScheduledTasks {
 
         return fileCollect.collectorTask
                 .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
-                .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
+                .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
     }
 
-    private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
-        logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
-        deleteFile(fileData.getLocalFileName());
-        alreadyPublishedFiles.remove(fileData.getLocalFileName());
+    private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+        Path localFileName = fileData.getLocalFileName();
+        logger.error("File fetching failed: {}", localFileName);
+        deleteFile(localFileName);
+        alreadyPublishedFiles.remove(localFileName);
         taskCounter.decrementAndGet();
         return Mono.empty();
     }
 
-    private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+    private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
         final long maxNumberOfRetries = 3;
         final Duration initialRetryTimeout = Duration.ofSeconds(5);
 
@@ -160,13 +161,13 @@ public class ScheduledTasks {
 
     }
 
-    private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+    private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
         logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
         Path internalFileName = Paths.get(model.getInternalLocation());
         deleteFile(internalFileName);
         alreadyPublishedFiles.remove(internalFileName);
         taskCounter.decrementAndGet();
-        return Flux.empty();
+        return Mono.empty();
     }
 
     private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
@@ -179,7 +180,7 @@ public class ScheduledTasks {
         final DMaaPMessageConsumerTask messageConsumerTask =
                 new DMaaPMessageConsumerTask(this.applicationConfiguration);
         return messageConsumerTask.execute()
-                .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+                .onErrorResume(this::handleConsumeMessageFailure);
     }
 
     private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
@@ -187,13 +188,12 @@ public class ScheduledTasks {
         return Flux.empty();
     }
 
-    private Flux<Path> deleteFile(Path localFile) {
+    private void deleteFile(Path localFile) {
         logger.trace("Deleting file: {}", localFile);
         try {
             Files.delete(localFile);
         } catch (Exception e) {
             logger.warn("Could not delete file: {}, {}", localFile, e);
         }
-        return Flux.just(localFile);
     }
 }
index 73511d1..d2240f1 100644 (file)
@@ -37,7 +37,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
 import org.springframework.http.HttpStatus;
 
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
 /**
@@ -96,7 +96,7 @@ class DataRouterPublisherTest {
 
     @Test
     public void whenPassedObjectFits_ReturnsCorrectStatus() {
-        prepareMocksForTests(Flux.just(HttpStatus.OK));
+        prepareMocksForTests(Mono.just(HttpStatus.OK));
 
         StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
                 .expectNext(consumerDmaapModel).verifyComplete();
@@ -107,7 +107,7 @@ class DataRouterPublisherTest {
 
     @Test
     public void whenPassedObjectFits_firstFailsThenSucceeds() {
-        prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+        prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
 
         StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
                 .expectNext(consumerDmaapModel).verifyComplete();
@@ -118,7 +118,7 @@ class DataRouterPublisherTest {
 
     @Test
     public void whenPassedObjectFits_firstFailsThenFails() {
-        prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+        prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
 
         StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
                 .expectErrorMessage("Retries exhausted: 1/1").verify();
@@ -128,7 +128,7 @@ class DataRouterPublisherTest {
     }
 
     @SafeVarargs
-    final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
+    final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
         dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
         when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
                 nextHttpResponses);
index 10c5b16..804b46e 100644 (file)
@@ -67,7 +67,12 @@ public class XnfCollectorTaskImplTest {
     private static final String PWD = "pwd";
     private static final String FTPES_LOCATION =
             FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+    private static final String FTPES_LOCATION_NO_PORT =
+            FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
     private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+    private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS +  REMOTE_FILE_LOCATION;
+
     private static final String GZIP_COMPRESSION = "gzip";
     private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
     private static final String FILE_FORMAT_VERSION = "V10";
@@ -100,11 +105,11 @@ public class XnfCollectorTaskImplTest {
         // @formatter:on
     }
 
-    private FileData createFileData() {
+    private FileData createFileData(String location) {
         // @formatter:off
         return  ImmutableFileData.builder()
             .name(PM_FILE_NAME)
-            .location(FTPES_LOCATION)
+            .location(location)
             .compression(GZIP_COMPRESSION)
             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
             .fileFormatVersion(FILE_FORMAT_VERSION)
@@ -113,7 +118,7 @@ public class XnfCollectorTaskImplTest {
         // @formatter:on
     }
 
-    private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+    private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) {
         // @formatter:off
         return ImmutableConsumerDmaapModel.builder()
             .productName(PRODUCT_NAME)
@@ -123,7 +128,7 @@ public class XnfCollectorTaskImplTest {
             .startEpochMicrosec(START_EPOCH_MICROSEC)
             .timeZoneOffset(TIME_ZONE_OFFSET)
             .name(PM_FILE_NAME)
-            .location(FTPES_LOCATION)
+            .location(location)
             .internalLocation(LOCAL_FILE_LOCATION.toString())
             .compression(GZIP_COMPRESSION)
             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -146,9 +151,9 @@ public class XnfCollectorTaskImplTest {
         FileCollector collectorUndetTest =
                 new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
 
-        FileData fileData = createFileData();
+        FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
 
-        ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+        ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
 
         StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
                 .expectNext(expectedConsumerDmaapModel).verifyComplete();
@@ -168,7 +173,7 @@ public class XnfCollectorTaskImplTest {
         // @formatter:off
         FileData fileData = ImmutableFileData.builder()
                 .name(PM_FILE_NAME)
-                .location(SFTP_LOCATION)
+                .location(SFTP_LOCATION_NO_PORT)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
@@ -183,7 +188,7 @@ public class XnfCollectorTaskImplTest {
                 .startEpochMicrosec(START_EPOCH_MICROSEC)
                 .timeZoneOffset(TIME_ZONE_OFFSET)
                 .name(PM_FILE_NAME)
-                .location(SFTP_LOCATION)
+                .location(SFTP_LOCATION_NO_PORT)
                 .internalLocation(LOCAL_FILE_LOCATION.toString())
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -202,7 +207,7 @@ public class XnfCollectorTaskImplTest {
     public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
         FileCollector collectorUndetTest =
                 new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
-        FileData fileData = createFileData();
+        FileData fileData = createFileData(FTPES_LOCATION);
         doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
                 .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
@@ -219,9 +224,9 @@ public class XnfCollectorTaskImplTest {
         doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
                 .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
-        ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+        ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
 
-        FileData fileData = createFileData();
+        FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
         StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
                 .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
index ae1435c..442b766 100644 (file)
@@ -32,4 +32,8 @@ public class DatafileTaskException extends Exception {
     public DatafileTaskException(String message) {
         super(message);
     }
+
+    public DatafileTaskException(String message, Exception e) {
+        super(message + e);
+    }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
deleted file mode 100644 (file)
index 29160c9..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-import org.apache.commons.net.ftp.FTPSClient;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-
-public class FTPSClientWrapper implements IFTPSClient {
-    private FTPSClient ftpsClient = new FTPSClient();
-
-    @Override
-    public void setNeedClientAuth(boolean isNeedClientAuth) {
-        ftpsClient.setNeedClientAuth(isNeedClientAuth);
-    }
-
-    @Override
-    public void setKeyManager(KeyManager keyManager) {
-        ftpsClient.setKeyManager(keyManager);
-    }
-
-    @Override
-    public void setTrustManager(TrustManager trustManager) {
-        ftpsClient.setTrustManager(trustManager);
-    }
-
-    @Override
-    public void connect(String hostName, int port) throws IOException {
-        ftpsClient.connect(hostName, port);
-    }
-
-    @Override
-    public boolean login(String username, String password) throws IOException {
-        return ftpsClient.login(username, password);
-    }
-
-    @Override
-    public boolean logout() throws IOException {
-        return ftpsClient.logout();
-    }
-
-    @Override
-    public int getReplyCode() {
-        return ftpsClient.getReplyCode();
-    }
-
-    @Override
-    public void disconnect() throws IOException {
-        ftpsClient.disconnect();
-    }
-
-    @Override
-    public void enterLocalPassiveMode() {
-        ftpsClient.enterLocalPassiveMode();
-    }
-
-    @Override
-    public void setFileType(int fileType) throws IOException {
-        ftpsClient.setFileType(fileType);
-    }
-
-    @Override
-    public void execPBSZ(int psbz) throws IOException {
-        ftpsClient.execPBSZ(psbz);
-    }
-
-    @Override
-    public void execPROT(String prot) throws IOException {
-        ftpsClient.execPROT(prot);
-    }
-
-    @Override
-    public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException {
-        try {
-            if (!ftpsClient.retrieveFile(remote, local)) {
-                throw new DatafileTaskException("could not retrieve file");
-            }
-        } catch (IOException e) {
-            throw new DatafileTaskException(e);
-        }
-    }
-
-    @Override
-    public void setTimeout(Integer t) {
-        this.ftpsClient.setDefaultTimeout(t);
-    }
-
-    @Override
-    public boolean isConnected() {
-        return ftpsClient.isConnected();
-    }
-
-    @Override
-    public void setBufferSize(int bufSize) {
-        ftpsClient.setBufferSize(bufSize);
-    }
-}
index f330b67..bedae43 100644 (file)
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
 import java.nio.file.Path;
+
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 /**
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
+@FunctionalInterface
 public interface FileCollectClient {
     public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
 }
index 461b220..c3b7990 100644 (file)
@@ -16,6 +16,8 @@
 
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -27,12 +29,10 @@ import java.util.Optional;
 
 import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.IFile;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
@@ -55,13 +55,11 @@ public class FtpsClient implements FileCollectClient {
     private Path trustedCAPath;
     private String trustedCAPassword;
 
-    private IFTPSClient realFtpsClient = new FTPSClientWrapper();
+    private FTPSClient realFtpsClient = new FTPSClient();
     private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper();
     private IKeyStore keyStore;
     private ITrustManagerFactory trustManagerFactory;
-    private IFile localFile = new FileWrapper();
     private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper();
-    private IOutputStream outputStream;
     private boolean keyManagerSet = false;
     private boolean trustManagerSet = false;
     private final FileServerData fileServerData;
@@ -83,7 +81,7 @@ public class FtpsClient implements FileCollectClient {
             getFileFromxNF(realFtpsClient, remoteFile, localFile);
         } catch (IOException e) {
             logger.trace("", e);
-            throw new DatafileTaskException("Could not open connection: " + e);
+            throw new DatafileTaskException("Could not open connection: ", e);
         } catch (KeyManagerException e) {
             logger.trace("", e);
             throw new DatafileTaskException(e);
@@ -93,7 +91,7 @@ public class FtpsClient implements FileCollectClient {
         logger.trace("collectFile fetched: {}", localFile);
     }
 
-    private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException {
+    private void setUpKeyManager(FTPSClient ftps) throws KeyManagerException {
         if (keyManagerSet) {
             logger.trace("keyManager already set!");
         } else {
@@ -104,7 +102,7 @@ public class FtpsClient implements FileCollectClient {
         logger.trace("complete setUpKeyManager");
     }
 
-    private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException {
+    private void setUpTrustedCA(FTPSClient ftps) throws DatafileTaskException {
         if (trustManagerSet) {
             logger.trace("trustManager already set!");
         } else {
@@ -130,7 +128,7 @@ public class FtpsClient implements FileCollectClient {
         return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
     }
 
-    private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException {
+    private void setUpConnection(FTPSClient ftps) throws DatafileTaskException, IOException {
         if (!ftps.isConnected()) {
             ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
             logger.trace("after ftp connect");
@@ -155,23 +153,26 @@ public class FtpsClient implements FileCollectClient {
         logger.trace("setUpConnection successfully!");
     }
 
-    private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName)
-            throws IOException, DatafileTaskException {
+    private void getFileFromxNF(FTPSClient ftps, String remoteFileName, Path localFileName)
+            throws IOException {
         logger.trace("starting to getFile");
 
-        this.localFile.setPath(localFileName);
-        this.localFile.createNewFile();
-
-        OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile());
+        File localFile = localFileName.toFile();
+        if (localFile.createNewFile()) {
+            logger.warn("Local file {} already created", localFileName);
+        }
+        OutputStream output = new FileOutputStream(localFile);
         logger.trace("begin to retrieve from xNF.");
-        ftps.retrieveFile(remoteFileName, output);
+        if (!ftps.retrieveFile(remoteFileName, output)) {
+            throw new IOException("Could not retrieve file");
+        }
         logger.trace("end retrieve from xNF.");
         output.close();
         logger.debug("File {} Download Successfull from xNF", localFileName);
     }
 
 
-    private void closeDownConnection(IFTPSClient ftps) {
+    private void closeDownConnection(FTPSClient ftps) {
         logger.trace("starting to closeDownConnection");
         if (ftps != null && ftps.isConnected()) {
             try {
@@ -220,7 +221,7 @@ public class FtpsClient implements FileCollectClient {
         return keyStore;
     }
 
-    void setFtpsClient(IFTPSClient ftpsClient) {
+    void setFtpsClient(FTPSClient ftpsClient) {
         this.realFtpsClient = ftpsClient;
     }
 
@@ -236,14 +237,6 @@ public class FtpsClient implements FileCollectClient {
         trustManagerFactory = tmf;
     }
 
-    void setFile(IFile file) {
-        localFile = file;
-    }
-
-    void setOutputStream(IOutputStream outputStream) {
-        this.outputStream = outputStream;
-    }
-
     void setFileSystemResource(IFileSystemResource fileSystemResource) {
         this.fileSystemResource = fileSystemResource;
     }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
deleted file mode 100644 (file)
index 3dcaa65..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-
-public interface IFTPSClient {
-    public void setNeedClientAuth(boolean isNeedClientAuth);
-
-    public void setKeyManager(KeyManager keyManager);
-
-    public void setTrustManager(TrustManager trustManager);
-
-    public void connect(String hostname, int port) throws IOException;
-
-    public boolean login(String username, String password) throws IOException;
-
-    public boolean logout() throws IOException;
-
-    public int getReplyCode();
-
-    public void setBufferSize(int bufSize);
-
-    public boolean isConnected();
-
-    public void disconnect() throws IOException;
-
-    public void enterLocalPassiveMode();
-
-    public void setFileType(int fileType) throws IOException;
-
-    public void execPBSZ(int newParam) throws IOException;
-
-    public void execPROT(String prot) throws IOException;
-
-    public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException;
-
-    void setTimeout(Integer t);
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
deleted file mode 100644 (file)
index 203a598..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-
-public class FileWrapper implements IFile {
-    private File file;
-
-    @Override
-    public void setPath(Path path) {
-        file = path.toFile();
-    }
-
-    @Override
-    public boolean createNewFile() throws IOException {
-        if (file == null) {
-            throw new IOException("Path to file not set.");
-        }
-        return file.createNewFile();
-    }
-
-    @Override
-    public File getFile() {
-        return file;
-    }
-
-    @Override
-    public boolean delete() {
-        return file.delete();
-    }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
deleted file mode 100644 (file)
index 2b95842..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-
-public interface IFile {
-    public void setPath(Path path);
-
-    public boolean createNewFile() throws IOException;
-
-    public File getFile();
-
-    public boolean delete();
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
deleted file mode 100644 (file)
index 8015ea7..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.OutputStream;
-
-@FunctionalInterface
-public interface IOutputStream {
-    public OutputStream getOutputStream(File file) throws FileNotFoundException;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
deleted file mode 100644 (file)
index 8878782..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.OutputStream;
-
-public class OutputStreamWrapper implements IOutputStream {
-
-    @Override
-    public OutputStream getOutputStream(File file) throws FileNotFoundException {
-        return new FileOutputStream(file);
-    }
-
-}
index bced3d8..4869e4c 100644 (file)
@@ -53,7 +53,7 @@ import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -101,7 +101,7 @@ public class DmaapProducerReactiveHttpClient {
      * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
      * @return status code of operation
      */
-    public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+    public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
         logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
         try {
             logger.trace("Starting to publish to DR {}",  consumerDmaapModel.getInternalLocation());
@@ -116,12 +116,12 @@ public class DmaapProducerReactiveHttpClient {
 
             Future<HttpResponse> future = webClient.execute(put, null);
             HttpResponse response = future.get();
-            logger.trace(response.toString());
+            logger.trace("{}", response);
             webClient.close();
-            return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+            return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
         } catch (Exception e) {
             logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
-            return Flux.error(e);
+            return Mono.error(e);
         }
     }
 
index c457726..670b1bd 100644 (file)
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -39,12 +38,11 @@ import javax.net.ssl.TrustManager;
 
 import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.io.IFile;
+import org.mockito.ArgumentMatchers;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
 import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory;
@@ -64,36 +62,31 @@ public class FtpsClientTest {
     private static final String USERNAME = "bob";
     private static final String PASSWORD = "123";
 
-    private IFTPSClient ftpsClientMock = mock(IFTPSClient.class);
+    private FTPSClient ftpsClientMock = mock(FTPSClient.class);
     private IKeyManagerUtils keyManagerUtilsMock = mock(IKeyManagerUtils.class);
     private KeyManager keyManagerMock = mock(KeyManager.class);
     private IKeyStore keyStoreWrapperMock = mock(IKeyStore.class);
     private KeyStore keyStoreMock = mock(KeyStore.class);
     private ITrustManagerFactory trustManagerFactoryMock = mock(ITrustManagerFactory.class);
     private TrustManager trustManagerMock = mock(TrustManager.class);
-    private IFile localFileMock = mock(IFile.class);
     private IFileSystemResource fileResourceMock = mock(IFileSystemResource.class);
-    private IOutputStream outputStreamMock = mock(IOutputStream.class);
     private InputStream inputStreamMock = mock(InputStream.class);
 
     FtpsClient clientUnderTest = new FtpsClient(createFileServerData());
 
 
     private ImmutableFileServerData createFileServerData() {
-        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD)
+                .port(PORT).build();
     }
 
-
     @BeforeEach
     protected void setUp() throws Exception {
         clientUnderTest.setFtpsClient(ftpsClientMock);
         clientUnderTest.setKeyManagerUtils(keyManagerUtilsMock);
         clientUnderTest.setKeyStore(keyStoreWrapperMock);
         clientUnderTest.setTrustManagerFactory(trustManagerFactoryMock);
-        clientUnderTest.setFile(localFileMock);
         clientUnderTest.setFileSystemResource(fileResourceMock);
-        clientUnderTest.setOutputStream(outputStreamMock);
 
         clientUnderTest.setKeyCertPath(FTP_KEY_PATH);
         clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
@@ -109,11 +102,10 @@ public class FtpsClientTest {
         when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
         when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
         when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
-        File fileMock = mock(File.class);
-        when(localFileMock.getFile()).thenReturn(fileMock);
-        OutputStream osMock = mock(OutputStream.class);
-        when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
+
         when(ftpsClientMock.isConnected()).thenReturn(false, true);
+        when(ftpsClientMock.retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+                ArgumentMatchers.any(OutputStream.class))).thenReturn(true);
 
         clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
@@ -133,10 +125,8 @@ public class FtpsClientTest {
         verify(ftpsClientMock).execPROT("P");
         verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
         verify(ftpsClientMock).setBufferSize(1024 * 1024);
-        verify(localFileMock).setPath(LOCAL_FILE_PATH);
-        verify(localFileMock, times(1)).createNewFile();
-        verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
-        verify(osMock, times(1)).close();
+        verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+                ArgumentMatchers.any(OutputStream.class));
         verify(ftpsClientMock, times(1)).logout();
         verify(ftpsClientMock, times(1)).disconnect();
         verify(ftpsClientMock, times(2)).isConnected();
@@ -149,8 +139,8 @@ public class FtpsClientTest {
                 .setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
         when(ftpsClientMock.isConnected()).thenReturn(false);
 
-        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
-            .hasMessage("org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException");
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)).hasMessage(
+                "org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException");
 
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -167,7 +157,7 @@ public class FtpsClientTest {
         doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock);
 
         assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
-            .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException");
+                .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException");
     }
 
     @Test
@@ -203,7 +193,7 @@ public class FtpsClientTest {
         when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
 
         assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
-            .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
+                .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
 
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -230,7 +220,7 @@ public class FtpsClientTest {
         doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT);
 
         assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
-            .hasMessage("Could not open connection: java.io.IOException");
+                .hasMessage("Could not open connection: java.io.IOException");
 
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -254,10 +244,8 @@ public class FtpsClientTest {
         when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
         when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
 
-        doThrow(new IOException()).when(localFileMock).createNewFile();
-
-        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
-            .hasMessage("Could not open connection: java.io.IOException");
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, Paths.get("")))
+                .hasMessage("Could not open connection: java.io.IOException: No such file or directory");
 
     }
 
@@ -269,14 +257,13 @@ public class FtpsClientTest {
         when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
         when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
         when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
-        File fileMock = mock(File.class);
-        when(localFileMock.getFile()).thenReturn(fileMock);
-        OutputStream osMock = mock(OutputStream.class);
-        when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
-        doThrow(new DatafileTaskException("problemas")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
+        when(ftpsClientMock.isConnected()).thenReturn(false);
+
+        doThrow(new IOException("problemas")).when(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+                ArgumentMatchers.any(OutputStream.class));
 
         assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
-            .hasMessage("problemas");
+                .hasMessage("Could not open connection: java.io.IOException: problemas");
 
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
@@ -294,9 +281,8 @@ public class FtpsClientTest {
         verify(ftpsClientMock).execPROT("P");
         verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
         verify(ftpsClientMock).setBufferSize(1024 * 1024);
-        verify(localFileMock).setPath(LOCAL_FILE_PATH);
-        verify(localFileMock, times(1)).createNewFile();
-        verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
+        verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+                ArgumentMatchers.any(OutputStream.class));
         verify(ftpsClientMock, times(2)).isConnected();
         verifyNoMoreInteractions(ftpsClientMock);
     }
index 7f32e8c..102388e 100644 (file)
@@ -17,7 +17,6 @@
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-
 import static org.apache.commons.io.IOUtils.toByteArray;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -52,7 +51,8 @@ public class SftpClientTest {
     public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD);
 
     @Test
-    public void collectFile_withOKresponse() throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
+    public void collectFile_withOKresponse()
+            throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
         FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
                 .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
         SftpClient sftpClient = new SftpClient(expectedFileServerData);
@@ -67,8 +67,8 @@ public class SftpClientTest {
 
     @Test
     public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException {
-        FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
-                .userId("Wrong").password(PASSWORD).port(sftpServer.getPort()).build();
+        FileServerData expectedFileServerData =
+                ImmutableFileServerData.builder().serverAddress("127.0.0.1").userId("Wrong").password(PASSWORD).build();
         SftpClient sftpClient = new SftpClient(expectedFileServerData);
         sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);