Add HTTP as new protocol to collect files from xNFs 59/116459/6
authorKrzysztof Gajewski <krzysztof.gajewski@nokia.com>
Tue, 15 Dec 2020 10:19:51 +0000 (11:19 +0100)
committerKrzysztof Gajewski <krzysztof.gajewski@nokia.com>
Wed, 30 Dec 2020 10:51:40 +0000 (11:51 +0100)
- HTTP basic auth included
- small code refactoring related to the task

Issue-ID: DCAEGEN2-2527

Signed-off-by: Krzysztof Gajewski <krzysztof.gajewski@nokia.com>
Change-Id: I13ec80e996861e14d2c561087c4af3b34d861030

22 files changed:
datafile-app-server/pom.xml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java [moved from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java with 96% similarity]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java [moved from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java with 96% similarity]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java [moved from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java with 90% similarity]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java [new file with mode: 0644]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java [new file with mode: 0644]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java [moved from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java with 88% similarity]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
pom.xml

index 62fdf48..950a3b9 100644 (file)
     </dependency>
   </dependencies>
 
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>io.projectreactor</groupId>
+        <artifactId>reactor-bom</artifactId>
+        <version>${projectreactor.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <build>
     <resources>
       <resource>
@@ -17,7 +17,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.commons;
 
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
@@ -28,10 +28,10 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
  *
  */
 public enum Scheme {
-    FTPES, SFTP;
+    FTPES, SFTP, HTTP;
 
     public static final String DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG = "DFC does not support protocol ";
-    public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS and sFTP";
+    public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS, sFTP and HTTP";
 
     /**
      * Get a <code>Scheme</code> from a string.
@@ -46,6 +46,8 @@ public enum Scheme {
             result = Scheme.FTPES;
         } else if ("SFTP".equalsIgnoreCase(schemeString)) {
             result = Scheme.SFTP;
+        } else if ("HTTP".equalsIgnoreCase(schemeString)) {
+            result = Scheme.HTTP;
         } else {
             throw new DatafileTaskException(
                 DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + schemeString + SUPPORTED_PROTOCOLS_ERROR_MESSAGE);
index a91d46a..9bacec8 100644 (file)
@@ -41,6 +41,8 @@ import javax.net.ssl.TrustManagerFactory;
 import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPReply;
 import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.slf4j.Logger;
index d168520..e972aa3 100644 (file)
@@ -27,6 +27,8 @@ import java.nio.file.Path;
 import java.util.Optional;
 
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
new file mode 100644 (file)
index 0000000..86bfc21
--- /dev/null
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.netty.resources.ConnectionProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+public class DfcHttpClient implements FileCollectClient {
+
+    //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
+    private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
+    private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
+    private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
+
+    private final FileServerData fileServerData;
+    private Disposable disposableClient;
+
+    protected HttpClient client;
+
+    public DfcHttpClient(FileServerData fileServerData) {
+        this.fileServerData = fileServerData;
+    }
+
+    @Override public void open() throws DatafileTaskException {
+        logger.trace("Setting httpClient for file download.");
+
+        basicAuthDataPresentOrThrow();
+        this.client = HttpClient.create(pool).keepAlive(true).headers(
+            h -> h.add("Authorization", HttpUtils.basicAuth(this.fileServerData.userId(), this.fileServerData.password())));
+
+        logger.trace("httpClient, auth header was set.");
+    }
+
+    private void basicAuthDataPresentOrThrow() throws DatafileTaskException {
+        if ((this.fileServerData.userId().isEmpty()) || (this.fileServerData.password().isEmpty())) {
+            throw new DatafileTaskException("Not sufficient basic auth data for file.");
+        }
+    }
+
+    @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+        logger.trace("Prepare to collectFile {}", localFile);
+        CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Exception> errorMessage = new AtomicReference<>();
+
+        Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage);
+        Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage);
+
+        Flux<InputStream> responseContent = getServerResponse(remoteFile);
+        disposableClient = responseContent.subscribe(onSuccess, onError);
+
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new DatafileTaskException("Interrupted exception after datafile download - ", e);
+        }
+
+        if (isDownloadFailed(errorMessage)) {
+            throw new DatafileTaskException("Error occured during datafile download: ", errorMessage.get());
+        }
+
+        logger.trace("HTTP collectFile OK");
+    }
+
+    protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) {
+        return (errorMessage.get() != null);
+    }
+
+    @NotNull protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch, AtomicReference<Exception> errorMessages) {
+        return (Throwable response) -> {
+            errorMessages.set(new Exception("Error in connection has occurred during file download", response));
+            latch.countDown();
+        };
+    }
+
+    @NotNull protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
+            AtomicReference<Exception> errorMessages) {
+        return (InputStream response) -> {
+            logger.trace("Starting to process response.");
+            try {
+                long numBytes = Files.copy(response, localFile);
+                logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
+                logger.trace("CollectFile fetched: {}", localFile.toString());
+                response.close();
+            } catch (IOException e) {
+                errorMessages.set(new Exception("Error fetching file with", e));
+            } finally {
+                latch.countDown();
+            }
+        };
+    }
+
+    protected Flux<InputStream> getServerResponse(String remoteFile) {
+        return client.get()
+            .uri(prepareUri(remoteFile))
+            .response((responseReceiver, byteBufFlux) -> {
+                logger.trace("HTTP response status - {}", responseReceiver.status());
+                if(isResponseOk(responseReceiver)){
+                    return byteBufFlux.aggregate().asInputStream();
+                }
+                return Mono.error(new Throwable("Unexpected server response code - "
+                    + responseReceiver.status().toString()));
+            });
+    }
+
+    protected boolean isResponseOk(HttpClientResponse httpClientResponse) {
+        return httpClientResponse.status().code() == 200;
+    }
+
+    @NotNull protected String prepareUri(String remoteFile) {
+        int port = fileServerData.port().isPresent() ? fileServerData.port().get() : HttpUtils.HTTP_DEFAULT_PORT;
+        return "http://" + fileServerData.serverAddress() + ":" + port + remoteFile;
+    }
+
+    @Override public void close() {
+        logger.trace("Starting http client disposal.");
+        disposableClient.dispose();
+        logger.trace("Http client disposed.");
+    }
+}
index 8721f61..1c8f57d 100644 (file)
@@ -27,9 +27,9 @@ import java.util.Optional;
 
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 
 /**
  * Contains data, from the fileReady event, about the file to collect from the xNF.
index 5371d48..1dca005 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Modifications Copyright (C) 2020 Nokia. All rights reserved
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,12 +21,20 @@ package org.onap.dcaegen2.collectors.datafile.service;
 
 import org.apache.http.HttpStatus;
 
+import java.util.Base64;
+
 public final class HttpUtils implements HttpStatus {
 
+    public static final int HTTP_DEFAULT_PORT = 80;
+
     private HttpUtils() {
     }
 
     public static boolean isSuccessfulResponseCode(Integer statusCode) {
         return statusCode >= 200 && statusCode < 300;
     }
+
+    public static String basicAuth(String username, String password) {
+        return "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
+    }
 }
index 708865f..42a0df1 100644 (file)
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.StreamSupport;
 
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
index 4b89f16..e76d415 100644 (file)
@@ -27,10 +27,11 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
+import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -111,12 +112,11 @@ public class FileCollector {
             counters.incNoOfFailedFtpAttempts();
             return Mono.just(Optional.empty()); // Give up
         } catch (DatafileTaskException e) {
-            logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
-                e.toString());
+            logger.warn("Failed to download file: {} {}, reason: ", fileData.sourceName(), fileData.name(), e);
             counters.incNoOfFailedFtpAttempts();
             return Mono.error(e);
         } catch (Exception throwable) {
-            logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+            logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
                 throwable.toString(), throwable);
             return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
         }
@@ -128,6 +128,8 @@ public class FileCollector {
                 return createSftpClient(fileData);
             case FTPES:
                 return createFtpesClient(fileData);
+            case HTTP:
+                return createHttpClient(fileData);
             default:
                 throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme());
         }
@@ -165,4 +167,8 @@ public class FileCollector {
         return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert()), config.keyPasswordPath(),
             Paths.get(config.trustedCa()), config.trustedCaPasswordPath());
     }
+
+    protected FileCollectClient createHttpClient(FileData fileData) {
+        return new DfcHttpClient(fileData.fileServerData());
+    }
 }
index 8e6ff94..d8daa56 100644 (file)
@@ -42,6 +42,7 @@ import org.apache.commons.net.ftp.FTPSClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatchers;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
 import org.springframework.http.HttpStatus;
 
 public class FtpesClientTest {
index d50bfc8..49fd365 100644 (file)
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableSftpConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
new file mode 100644 (file)
index 0000000..f49cd39
--- /dev/null
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import reactor.core.publisher.Flux;
+import reactor.netty.http.client.HttpClientConfig;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DfcHttpClientTest {
+
+    private static final String USERNAME = "bob";
+    private static final String PASSWORD = "123";
+    private static final String XNF_ADDRESS = "127.0.0.1";
+    private static final int PORT = 80;
+
+    @Mock
+    private Path pathMock;
+
+    DfcHttpClient dfcHttpClientSpy;
+
+    private ImmutableFileServerData createFileServerData() {
+        return ImmutableFileServerData.builder()
+            .serverAddress(XNF_ADDRESS)
+            .userId(USERNAME).password(PASSWORD)
+            .port(PORT)
+            .build();
+    }
+
+    @BeforeEach
+    public void setup() {
+        dfcHttpClientSpy = spy(new DfcHttpClient(createFileServerData()));
+    }
+
+    @Test
+    public void openConnection_successAuthSetup() throws DatafileTaskException {
+        dfcHttpClientSpy.open();
+        HttpClientConfig config = dfcHttpClientSpy.client.configuration();
+        assertEquals(HttpUtils.basicAuth(USERNAME, PASSWORD), config.headers().get("Authorization"));
+    }
+
+    @Test
+    public void openConnection_failedBasicAuthSetupThrowException() {
+        ImmutableFileServerData serverData = ImmutableFileServerData.builder()
+            .serverAddress(XNF_ADDRESS)
+            .userId(USERNAME).password("")
+            .port(PORT)
+            .build();
+
+        DfcHttpClient dfcHttpClientSpy = spy(new DfcHttpClient(serverData));
+
+        assertThatThrownBy(() -> dfcHttpClientSpy.open())
+            .hasMessageContaining("Not sufficient basic auth data for file.");
+    }
+
+    @Test
+    public void prepareUri_UriWithoutPort() {
+        ImmutableFileServerData serverData = ImmutableFileServerData.builder()
+            .serverAddress(XNF_ADDRESS)
+            .userId(USERNAME).password(PASSWORD)
+            .build();
+        DfcHttpClient clientNoPortSpy = spy(new DfcHttpClient(serverData));
+        String REMOTE_FILE = "any";
+
+        String retrievedUri = clientNoPortSpy.prepareUri(REMOTE_FILE);
+        assertTrue(retrievedUri.startsWith("http://" + XNF_ADDRESS + ":80"));
+    }
+
+    @Test
+    public void collectFile_AllOk() throws Exception {
+        String REMOTE_FILE = "any";
+        Flux<InputStream> fis = Flux.just(new ByteArrayInputStream("ReturnedString".getBytes()));
+
+        dfcHttpClientSpy.open();
+
+        when(dfcHttpClientSpy.getServerResponse(any())).thenReturn(fis);
+        doReturn(false).when(dfcHttpClientSpy).isDownloadFailed(any());
+
+        dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock);
+        dfcHttpClientSpy.close();
+
+        verify(dfcHttpClientSpy, times(1)).getServerResponse(ArgumentMatchers.eq(REMOTE_FILE));
+        verify(dfcHttpClientSpy, times(1)).processDataFromServer(any(), any(), any());
+        verify(dfcHttpClientSpy, times(1)).isDownloadFailed(any());
+    }
+
+    @Test
+    public void collectFile_No200ResponseWriteToErrorMessage() throws DatafileTaskException {
+        String ERROR_RESPONSE = "This is unexpected message";
+        String REMOTE_FILE = "any";
+        Flux<Throwable> fis = Flux.error(new Throwable(ERROR_RESPONSE));
+
+        dfcHttpClientSpy.open();
+
+        doReturn(fis).when(dfcHttpClientSpy).getServerResponse(any());
+
+        assertThatThrownBy(() -> dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock))
+            .hasMessageContaining("Error occured during datafile download: ");
+        verify(dfcHttpClientSpy, times(1)).getServerResponse(REMOTE_FILE);
+        verify(dfcHttpClientSpy, times(1)).processFailedConnectionWithServer(any(), any());
+        dfcHttpClientSpy.close();
+    }
+
+    @Test
+    public void isResponseOk_validateResponse() {
+        assertTrue(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.RESPONSE_OK));
+        assertFalse(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.RESPONSE_ANY_NO_OK));
+    }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
new file mode 100644 (file)
index 0000000..42ab4b3
--- /dev/null
@@ -0,0 +1,170 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.cookie.Cookie;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.context.Context;
+import reactor.util.context.ContextView;
+
+import java.util.Map;
+import java.util.Set;
+
+public class HttpClientResponseHelper {
+
+    public static final HttpClientResponse RESPONSE_OK = new HttpClientResponse() {
+
+        @Override
+        public Map<CharSequence, Set<Cookie>> cookies() {
+            return null;
+        }
+
+        @Override
+        public boolean isKeepAlive() {
+            return false;
+        }
+
+        @Override
+        public boolean isWebsocket() {
+            return false;
+        }
+
+        @Override
+        public HttpMethod method() {
+            return null;
+        }
+
+        @Override
+        public String fullPath() {
+            return null;
+        }
+
+        @Override
+        public String uri() {
+            return null;
+        }
+
+        @Override
+        public HttpVersion version() {
+            return null;
+        }
+
+        @Override
+        public Context currentContext() {
+            return null;
+        }
+
+        @Override
+        public ContextView currentContextView() {
+            return null;
+        }
+
+        @Override
+        public String[] redirectedFrom() {
+            return new String[0];
+        }
+
+        @Override
+        public HttpHeaders requestHeaders() {
+            return null;
+        }
+
+        @Override
+        public String resourceUrl() {
+            return null;
+        }
+
+        @Override
+        public HttpHeaders responseHeaders() {
+            return null;
+        }
+
+        @Override
+        public HttpResponseStatus status() {
+            return HttpResponseStatus.OK;
+        }
+    };
+
+    public static final HttpClientResponse RESPONSE_ANY_NO_OK = new HttpClientResponse() {
+
+        @Override
+        public Map<CharSequence, Set<Cookie>> cookies() {
+            return null;
+        }
+
+        @Override
+        public boolean isKeepAlive() {
+            return false;
+        }
+
+        @Override
+        public boolean isWebsocket() {
+            return false;
+        }
+
+        @Override
+        public HttpMethod method() {
+            return null;
+        }
+
+        @Override
+        public String fullPath() {
+            return null;
+        }
+
+        @Override
+        public String uri() {
+            return null;
+        }
+
+        @Override
+        public HttpVersion version() {
+            return null;
+        }
+
+        @Override public Context currentContext() {
+            return null;
+        }
+
+        @Override public ContextView currentContextView() {
+            return null;
+        }
+
+        @Override public String[] redirectedFrom() {
+            return new String[0];
+        }
+
+        @Override public HttpHeaders requestHeaders() {
+            return null;
+        }
+
+        @Override public String resourceUrl() {
+            return null;
+        }
+
+        @Override public HttpHeaders responseHeaders() {
+            return null;
+        }
+
+        @Override public HttpResponseStatus status() {
+            return HttpResponseStatus.NOT_IMPLEMENTED;
+        }
+    };
+}
index feeeb47..a446050 100644 (file)
@@ -24,9 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 
 public class FileDataTest {
     private static final String FTPES_SCHEME = "ftpes://";
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.scheme;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 
 public class SchemeTest {
 
@@ -31,6 +33,7 @@ public class SchemeTest {
     public void shouldReturnSchemeForSupportedProtocol() throws DatafileTaskException {
         assertEquals(Scheme.FTPES, Scheme.getSchemeFromString("FTPES"));
         assertEquals(Scheme.SFTP, Scheme.getSchemeFromString("SFTP"));
+        assertEquals(Scheme.HTTP, Scheme.getSchemeFromString("HTTP"));
     }
 
     @Test
index 9e642b7..c7ef8da 100644 (file)
@@ -35,7 +35,7 @@ import java.util.Optional;
 
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -217,7 +217,7 @@ class JsonMessageParserTest {
     void whenPassingCorrectJsonWrongScheme_noMessage() {
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
             .name(PM_FILE_NAME) //
-            .location("http://location.xml") //
+            .location("unreal://location.xml") //
             .compression(GZIP_COMPRESSION) //
             .fileFormatType(FILE_FORMAT_TYPE) //
             .fileFormatVersion(FILE_FORMAT_VERSION) //
@@ -242,8 +242,8 @@ class JsonMessageParserTest {
 
         assertTrue(logAppender.list.toString()
                 .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
-                    + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "http" + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE
-                    + ". Location: http://location.xml"),"Error missing in log");
+                    + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "unreal" + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE
+                    + ". Location: unreal://location.xml"),"Error missing in log");
         assertTrue(logAppender.list.toString().contains("sourceName=5GRAN_DU"),"Missing sourceName in log");
     }
 
index f0c8e3b..e68913f 100644 (file)
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
index a98e2ba..1146cf2 100644 (file)
@@ -42,7 +42,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
index d5a9a92..09d7627 100644 (file)
@@ -55,7 +55,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration
 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -74,7 +74,6 @@ import org.slf4j.MDC;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
-import reactor.test.scheduler.VirtualTimeScheduler;
 
 public class ScheduledTasksTest {
 
diff --git a/pom.xml b/pom.xml
index 023f467..8fb4443 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@
     <spring-boot.version>2.4.0</spring-boot.version>
     <commons-io.version>1.3.2</commons-io.version>
     <commons-net.version>3.3</commons-net.version>
+    <projectreactor.version>2020.0.2</projectreactor.version>
 
     <!-- LOGGING SETTINGS -->
     <slf4j.version>1.7.25</slf4j.version>
@@ -71,7 +72,7 @@
     <!-- Plugin versions -->
     <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version>
     <maven-surefire-plugin.version>2.22.0</maven-surefire-plugin.version>
-    <docker-maven-plugin.version>1.1.1</docker-maven-plugin.version>
+    <docker-maven-plugin.version>1.2.1</docker-maven-plugin.version>
     <git-commit-id-plugin.version>2.2.4</git-commit-id-plugin.version>
     <sonar.coverage.jacoco.xmlReportPaths>
       ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
         <artifactId>springfox-swagger-ui</artifactId>
         <version>${springfox.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.projectreactor</groupId>
+        <artifactId>reactor-bom</artifactId>
+        <version>${projectreactor.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>