fix bugs in jira DCAEGEN2-940 and DCAEGEN2-941 02/74102/2
authorChengkai Yan <martin.c.yan@est.tech>
Fri, 23 Nov 2018 15:23:56 +0000 (16:23 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 3 Dec 2018 12:13:20 +0000 (13:13 +0100)
Change-Id: Id0b3e295cab0e085746b034caccbf82aca2e0d7b
Signed-off-by: Chengkai Yan <martin.c.yan@est.tech>
Issue-ID: DCAEGEN2-940
Issue-ID: DCAEGEN2-941

16 files changed:
datafile-app-server/config/application.yaml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-dmaap-client/pom.xml
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java [deleted file]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java [deleted file]
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java [new file with mode: 0644]
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java [deleted file]
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java [deleted file]
pom.xml

index cef185c..b66f7b6 100644 (file)
@@ -14,6 +14,7 @@ logging:
     ROOT: ERROR
     org.springframework: ERROR
     org.springframework.data: ERROR
+    org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.onap.dcaegen2.collectors.datafile: ERROR
   file: opt/log/application.log
 app:
index 171dd02..c465fe9 100644 (file)
@@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -59,11 +60,18 @@ public class ScheduledTasks {
      */
     public void scheduleMainDatafileEventTask() {
         logger.trace("Execution of tasks was registered");
-
+        //@formatter:off
         consumeFromDmaapMessage()
+                .publishOn(Schedulers.parallel())
+                .cache()
                 .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
-                .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+                .flatMap(this::collectFilesFromXnf)
+                .retry(3)
+                .cache()
+                .flatMap(this::publishToDmaapConfiguration)
+                .retry(3)
                 .subscribe(this::onSuccess, this::onError, this::onComplete);
+        //@formatter:on
     }
 
     private void onComplete() {
index 52394ad..c863f6a 100644 (file)
   </properties>
 
   <dependencies>
-
     <!-- DEVELOPMENT DEPENDENCIES -->
+    <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpasyncclient</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
       <artifactId>datafile-commons</artifactId>
index 2ccf1ba..4b7cc01 100644 (file)
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -74,6 +72,11 @@ public class FTPSClientWrapper implements IFTPSClient {
         ftpsClient.enterLocalPassiveMode();
     }
 
+    @Override
+    public void setFileType(int fileType) throws IOException {
+        ftpsClient.setFileType(fileType);
+    }
+
     @Override
     public void execPBSZ(int psbz) throws IOException {
         ftpsClient.execPBSZ(psbz);
@@ -93,4 +96,14 @@ public class FTPSClientWrapper implements IFTPSClient {
     public void setTimeout(Integer t) {
         this.ftpsClient.setDefaultTimeout(t);
     }
+
+    @Override
+    public boolean isConnected() {
+        return ftpsClient.isConnected();
+    }
+
+    @Override
+    public void setBufferSize(int bufSize) {
+        ftpsClient.setBufferSize(bufSize);
+    }
 }
index c596217..fa1d431 100644 (file)
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -44,6 +42,7 @@ public class FileCollectResult {
 
     @Override
     public String toString() {
-        return "Download successful: " + result + " Error data: " + getErrorData();
+        return "FileCollectResult: "
+                + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
     }
 }
index 120868c..49b7b66 100644 (file)
@@ -23,6 +23,7 @@ import java.security.GeneralSecurityException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 
+import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPReply;
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
@@ -72,7 +73,6 @@ public class FtpsClient extends FileCollectClient {
 
         if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
             if (getFileFromxNF(ftps)) {
-                closeDownConnection(ftps);
                 fileCollectResult = new FileCollectResult();
             } else {
                 fileCollectResult = new FileCollectResult(errorData);
@@ -80,6 +80,7 @@ public class FtpsClient extends FileCollectClient {
         } else {
             fileCollectResult = new FileCollectResult(errorData);
         }
+        closeDownConnection(ftps);
         logger.trace("retryCollectFile left with result: {}", fileCollectResult);
         return fileCollectResult;
     }
@@ -87,6 +88,7 @@ public class FtpsClient extends FileCollectClient {
     private boolean setUpKeyManager(IFTPSClient ftps) {
         boolean result = true;
         if (keyManagerSet) {
+            logger.trace("keyManager already set!");
             return result;
         }
         try {
@@ -105,6 +107,7 @@ public class FtpsClient extends FileCollectClient {
     private boolean setUpTrustedCA(IFTPSClient ftps) {
         boolean result = true;
         if (trustManagerSet) {
+            logger.trace("trustManager already set!");
             return result;
         }
         try {
@@ -130,32 +133,42 @@ public class FtpsClient extends FileCollectClient {
     private boolean setUpConnection(IFTPSClient ftps) {
         boolean result = true;
         try {
+            if (ftps.isConnected()) {
+                addError(
+                        "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
+                        null);
+                return false;
+            }
             ftps.connect(fileServerData.serverAddress(), fileServerData.port());
             logger.trace("after ftp connect");
             boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
             if (!loginSuccesful) {
-                ftps.logout();
+                closeDownConnection(ftps);
                 addError("Unable to log in to xNF. " + fileServerData, null);
-                result = false;
+                return false;
             }
 
             if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
                 ftps.enterLocalPassiveMode();
+                ftps.setFileType(FTP.BINARY_FILE_TYPE);
                 // Set protection buffer size
                 ftps.execPBSZ(0);
                 // Set data channel protection to private
                 ftps.execPROT("P");
+                ftps.setBufferSize(1024 * 1024);
             } else {
-                ftps.disconnect();
+                closeDownConnection(ftps);
                 addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
                         null);
-                result = false;
+                return false;
             }
-        } catch (Exception ex) {
-            addError("Unable to connect to xNF. Data: " + fileServerData, ex);
-            result = false;
+        } catch (Exception e) {
+            logger.trace("connect to ftp server failed.", e);
+            addError("Unable to connect to xNF. Data: " + fileServerData, e);
+            closeDownConnection(ftps);
+            return false;
         }
-        logger.trace("setUpConnection return value: {}", result);
+        logger.trace("setUpConnection successfully!");
         return result;
     }
 
@@ -169,8 +182,9 @@ public class FtpsClient extends FileCollectClient {
 
             IOutputStream outputStream = getOutputStream();
             OutputStream output = outputStream.getOutputStream(outfile.getFile());
-
+            logger.trace("begin to retrieve from xNF.");
             result = ftps.retrieveFile(remoteFile, output);
+            logger.trace("end retrieve from xNF.");
             if (!result) {
                 output.close();
                 logger.debug("Unable to retrieve file from xNF. Cause unknown!");
@@ -184,22 +198,28 @@ public class FtpsClient extends FileCollectClient {
             try {
                 outfile.delete();
             } catch (Exception e) {
-                // Nothing
+                logger.error("Unable to close file. {}", e);
             }
-            result = false;
+            return false;
         }
         return result;
     }
 
     private void closeDownConnection(IFTPSClient ftps) {
         logger.trace("starting to closeDownConnection");
-        try {
-            if (ftps != null) {
-                ftps.logout();
+        if (ftps != null && ftps.isConnected()) {
+            try {
+                boolean logOut = ftps.logout();
+                logger.trace("logOut: {}", logOut);
+            } catch (Exception e) {
+                logger.trace("Unable to logout connection.", e);
+            }
+            try {
                 ftps.disconnect();
+                logger.trace("disconnected!");
+            } catch (Exception e) {
+                logger.trace("Unable to disconnect connection.", e);
             }
-        } catch (Exception e) {
-            // Do nothing, file has been collected.
         }
     }
 
index b147202..1a58163 100644 (file)
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -39,10 +37,16 @@ public interface IFTPSClient {
 
     public int getReplyCode();
 
+    public void setBufferSize(int bufSize);
+
+    public boolean isConnected();
+
     public void disconnect() throws IOException;
 
     public void enterLocalPassiveMode();
 
+    public void setFileType(int fileType) throws IOException;
+
     public void execPBSZ(int newParam) throws IOException;
 
     public void execPROT(String prot) throws IOException;
index b4c5269..6a2c663 100644 (file)
@@ -23,26 +23,29 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.Future;
+
+import javax.net.ssl.SSLContext;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.ssl.SSLContextBuilder;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate;
-import org.onap.dcaegen2.collectors.datafile.web.RestTemplateWrapper;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.core.publisher.Flux;
@@ -70,7 +73,7 @@ public class DmaapProducerReactiveHttpClient {
     private final String pwd;
 
     private IFileSystemResource fileResource;
-    private IRestTemplate restTemplate;
+    private CloseableHttpAsyncClient webClient;
 
     /**
      * Constructor DmaapProducerReactiveHttpClient.
@@ -78,7 +81,6 @@ public class DmaapProducerReactiveHttpClient {
      * @param dmaapPublisherConfiguration - DMaaP producer configuration object
      */
     public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
-
         this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
@@ -97,54 +99,70 @@ public class DmaapProducerReactiveHttpClient {
     public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
         logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
         try {
-            HttpHeaders headers = new HttpHeaders();
-            headers.setContentType(MediaType.parseMediaType(dmaapContentType));
-            addMetaDataToHead(consumerDmaapModel, headers);
-
-            addUserCredentialsToHead(headers);
-
-            IFileSystemResource fileSystemResource = getFileSystemResource();
-            fileSystemResource.setPath(consumerDmaapModel.getLocation());
-            InputStream fileInputStream = fileSystemResource.getInputStream();
-            HttpEntity<byte[]> request = addFileToRequest(fileInputStream, headers);
+            logger.trace("Starting to publish to DR");
 
+            webClient = getWebClient();
+            webClient.start();
 
-            logger.trace("Starting to publish to DR");
-            ResponseEntity<String> responseEntity = getRestTemplate().exchange(getUri(consumerDmaapModel.getName()),
-                    HttpMethod.PUT, request, String.class);
+            HttpPut put = new HttpPut();
+            prepareHead(consumerDmaapModel, put);
+            prepareBody(consumerDmaapModel, put);
+            addUserCredentialsToHead(put);
 
-            return Flux.just(responseEntity.getStatusCode().toString());
+            Future<HttpResponse> future = webClient.execute(put, null);
+            HttpResponse response = future.get();
+            logger.trace(response.toString());
+            webClient.close();
+            handleHttpResponse(response);
+            return Flux.just(response.toString());
         } catch (Exception e) {
             logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
             return Flux.empty();
         }
     }
 
-    private void addUserCredentialsToHead(HttpHeaders headers) {
+    private void handleHttpResponse(HttpResponse response) {
+        int statusCode = response.getStatusLine().getStatusCode();
+        if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
+            logger.trace("Publish to DR successful!");
+        } else {
+            logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+        }
+    }
+
+    private void addUserCredentialsToHead(HttpPut put) {
         String plainCreds = user + ":" + pwd;
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
         byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
         String base64Creds = new String(base64CredsBytes);
         logger.trace("base64Creds...: {}", base64Creds);
-        headers.add("Authorization", "Basic " + base64Creds);
+        put.addHeader("Authorization", "Basic " + base64Creds);
     }
 
-    private void addMetaDataToHead(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers) {
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
-        metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+    private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+        put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+        String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
-        headers.set(X_ATT_DR_META, metaData.toString());
-    }
-    private HttpEntity<byte[]> addFileToRequest(InputStream inputStream, HttpHeaders headers)
-            throws IOException {
-        return new HttpEntity<>(IOUtils.toByteArray(inputStream), headers);
+        put.addHeader(X_ATT_DR_META, metaData.toString());
+        put.setURI(getUri(name));
     }
 
-    private IRestTemplate getRestTemplate() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException {
-        if (restTemplate == null) {
-            restTemplate = new RestTemplateWrapper();
+    private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
+        String fileLocation = model.getLocation();
+        IFileSystemResource fileSystemResource = getFileSystemResource();
+        fileSystemResource.setPath(fileLocation);
+        InputStream fileInputStream = null;
+        try {
+            fileInputStream = fileSystemResource.getInputStream();
+        } catch (IOException e) {
+            logger.error("Unable to get stream from filesystem.", e);
+        }
+        try {
+            put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+        } catch (IOException e) {
+            logger.error("Unable to set put request body from ByteArray.", e);
         }
-        return restTemplate;
     }
 
     private URI getUri(String fileName) {
@@ -164,7 +182,26 @@ public class DmaapProducerReactiveHttpClient {
         fileResource = fileSystemResource;
     }
 
-    protected void setRestTemplate(IRestTemplate restTemplate) {
-        this.restTemplate = restTemplate;
+    protected CloseableHttpAsyncClient getWebClient() {
+        if (webClient != null) {
+            return webClient;
+        }
+        SSLContext sslContext = null;
+        try {
+            sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+        } catch (Exception e) {
+            logger.trace("Unable to get sslContext.", e);
+        }
+        //@formatter:off
+        return HttpAsyncClients.custom()
+                .setSSLContext(sslContext)
+                .setSSLHostnameVerifier(new NoopHostnameVerifier())
+                .setRedirectStrategy(PublishRedirectStrategy.INSTANCE)
+                .build();
+        //@formatter:on
+    }
+
+    protected void setWebClient(CloseableHttpAsyncClient client) {
+        this.webClient = client;
     }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java
deleted file mode 100644 (file)
index 15d459f..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-package org.onap.dcaegen2.collectors.datafile.web;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpRequest;
-import org.springframework.http.client.ClientHttpRequestExecution;
-import org.springframework.http.client.ClientHttpRequestInterceptor;
-import org.springframework.http.client.ClientHttpResponse;
-
-public class RequestResponseLoggingInterceptor implements ClientHttpRequestInterceptor {
-
-    private final Logger log = LoggerFactory.getLogger(this.getClass());
-
-    @Override
-    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
-        logRequest(request, body);
-        ClientHttpResponse response = execution.execute(request, body);
-        logResponse(response);
-        return response;
-    }
-
-    private void logRequest(HttpRequest request, byte[] body) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("===========================request begin================================================");
-            log.debug("URI         : {}", request.getURI());
-            log.debug("Method      : {}", request.getMethod());
-            log.debug("Headers     : {}", request.getHeaders());
-            log.debug("Request body: {}", new String(body, "UTF-8"));
-            log.debug("==========================request end================================================");
-        }
-    }
-
-    private void logResponse(ClientHttpResponse response) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("============================response begin==========================================");
-            log.debug("Status code  : {}", response.getStatusCode());
-            log.debug("Status text  : {}", response.getStatusText());
-            log.debug("Headers      : {}", response.getHeaders());
-            log.debug("=======================response end=================================================");
-        }
-    }
-}
\ No newline at end of file
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java
deleted file mode 100644 (file)
index a1b4284..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-import java.net.URI;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.util.Collections;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.ssl.SSLContextBuilder;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * @author
- *
- */
-public class RestTemplateWrapper implements IRestTemplate {
-    private RestTemplate restTemplate;
-
-    public RestTemplateWrapper() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
-        SSLContext sslContext =
-                new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
-        CloseableHttpClient httpClient =
-                HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
-                        .setRedirectStrategy(new PublishRedirectStrategy()).build();
-
-        HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
-        requestFactory.setHttpClient(httpClient);
-
-        restTemplate = new RestTemplate(requestFactory);
-        restTemplate.setInterceptors(Collections.singletonList(new RequestResponseLoggingInterceptor()));
-
-    }
-
-    @Override
-    public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity,
-            Class<String> responseType) {
-        return restTemplate.exchange(url,  method, requestEntity, responseType);
-    }
-
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
new file mode 100644 (file)
index 0000000..0511ad2
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import org.junit.jupiter.api.Test;
+
+public class FileCollectResultTest {
+
+    @Test
+    public void successfulResult() {
+        FileCollectResult resultUnderTest = new FileCollectResult();
+        assertTrue(resultUnderTest.downloadSuccessful());
+        assertEquals("FileCollectResult: successful!", resultUnderTest.toString());
+    }
+
+    @Test
+    public void unSuccessfulResult() {
+        ErrorData errorData = new ErrorData();
+        errorData.addError("Error", null);
+        errorData.addError("Null", new NullPointerException());
+        FileCollectResult resultUnderTest = new FileCollectResult(errorData);
+        assertFalse(resultUnderTest.downloadSuccessful());
+        assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(), resultUnderTest.toString());
+    }
+}
index e5693d5..2d59ad5 100644 (file)
@@ -36,6 +36,8 @@ import java.security.KeyStoreException;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.TrustManager;
 
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPReply;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.io.IFile;
@@ -88,7 +90,7 @@ public class FtpsClientTest {
         clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
         clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH);
         clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD);
-}
+    }
 
     @Test
     public void collectFile_allOk() throws Exception {
@@ -103,6 +105,7 @@ public class FtpsClientTest {
         OutputStream osMock = mock(OutputStream.class);
         when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
         when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true);
+        when(ftpsClientMock.isConnected()).thenReturn(false, true);
 
         ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
                 .userId(USERNAME).password(PASSWORD).port(PORT).build();
@@ -124,19 +127,22 @@ public class FtpsClientTest {
         verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
         verify(ftpsClientMock).execPBSZ(0);
         verify(ftpsClientMock).execPROT("P");
+        verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+        verify(ftpsClientMock).setBufferSize(1024 * 1024);
         verify(localFileMock).setPath(LOCAL_FILE_PATH);
         verify(localFileMock, times(1)).createNewFile();
         verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
         verify(osMock, times(1)).close();
         verify(ftpsClientMock, times(1)).logout();
         verify(ftpsClientMock, times(1)).disconnect();
+        verify(ftpsClientMock, times(2)).isConnected();
         verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
     public void collectFileFaultyOwnKey_shouldFail() throws Exception {
-        doThrow(new GeneralSecurityException())
-                .when(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        doThrow(new GeneralSecurityException()).when(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH,
+                FTP_KEY_PASSWORD);
 
         ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
                 .userId(USERNAME).password(PASSWORD).port(PORT).build();
@@ -144,6 +150,10 @@ public class FtpsClientTest {
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock, times(1)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -160,6 +170,15 @@ public class FtpsClientTest {
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock, times(1)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -175,8 +194,19 @@ public class FtpsClientTest {
 
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
-        verify(ftpsClientMock, times(1)).logout();
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock, times(3)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -186,15 +216,27 @@ public class FtpsClientTest {
         when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock);
         when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
         when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
-        when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.BAD_REQUEST.value());
+        when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
 
         ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
                 .userId(USERNAME).password(PASSWORD).port(PORT).build();
 
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
-        verify(ftpsClientMock, times(1)).disconnect();
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock, times(2)).getReplyCode();
+        verify(ftpsClientMock, times(3)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -212,6 +254,17 @@ public class FtpsClientTest {
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock, times(3)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -232,5 +285,68 @@ public class FtpsClientTest {
 
         assertFalse(result.downloadSuccessful());
         verify(localFileMock, times(1)).delete();
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock).getReplyCode();
+        verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+        verify(ftpsClientMock).execPBSZ(0);
+        verify(ftpsClientMock).execPROT("P");
+        verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+        verify(ftpsClientMock).setBufferSize(1024 * 1024);
+        verify(localFileMock).setPath(LOCAL_FILE_PATH);
+        verify(localFileMock, times(1)).createNewFile();
+        verify(ftpsClientMock, times(2)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
-}
\ No newline at end of file
+
+    @Test
+    public void collectFileFailingFileRetrieve_shouldFail() throws Exception {
+        when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock);
+        when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock);
+        when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock);
+        when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
+        when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
+        when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value());
+        File fileMock = mock(File.class);
+        when(localFileMock.getFile()).thenReturn(fileMock);
+        OutputStream osMock = mock(OutputStream.class);
+        when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
+        when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(false);
+
+        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
+                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+
+        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+
+        assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock).getReplyCode();
+        verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+        verify(ftpsClientMock).execPBSZ(0);
+        verify(ftpsClientMock).execPROT("P");
+        verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+        verify(ftpsClientMock).setBufferSize(1024 * 1024);
+        verify(localFileMock).setPath(LOCAL_FILE_PATH);
+        verify(localFileMock, times(1)).createNewFile();
+        verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
+        verify(ftpsClientMock, times(2)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
+    }
+}
index ba42462..6cf7190 100644 (file)
@@ -19,7 +19,6 @@ package org.onap.dcaegen2.collectors.datafile.service.producer;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.gson.JsonElement;
@@ -30,9 +29,16 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -40,13 +46,8 @@ import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModelForUnitTest;
-import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate;
-import org.springframework.http.HttpEntity;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
 import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.test.StepVerifier;
@@ -77,11 +78,12 @@ class DmaapProducerReactiveHttpClientTest {
     private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
 
     private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
-    private IRestTemplate restTemplateMock = mock(IRestTemplate.class);
-    private ResponseEntity<String> responseEntityMock = mock(ResponseEntity.class);
+    private CloseableHttpAsyncClient clientMock;
+    private HttpResponse responseMock = mock(HttpResponse.class);
+    private Future<HttpResponse> futureMock = mock(Future.class);
+    private StatusLine statusLine = mock(StatusLine.class);
     private InputStream fileStream;
 
-
     @BeforeEach
     void setUp() {
         when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
@@ -94,47 +96,60 @@ class DmaapProducerReactiveHttpClientTest {
 
         dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
         dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
-        dmaapProducerReactiveHttpClient.setRestTemplate(restTemplateMock);
+        clientMock = mock(CloseableHttpAsyncClient.class);
+        dmaapProducerReactiveHttpClient.setWebClient(clientMock);
     }
 
     @Test
     void getHttpResponse_Success() throws Exception {
-        mockWebClientDependantObject();
-
-        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
-                .expectNext(HttpStatus.OK.toString()).verifyComplete();
-
+        mockWebClientDependantObject(true);
         URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
                 .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
 
-        HttpHeaders headers = new HttpHeaders();
-
-        headers.setContentType(MediaType.parseMediaType(APPLICATION_OCTET_STREAM_CONTENT_TYPE));
+        HttpPut httpPut = new HttpPut();
+        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
 
         JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
         metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+
         metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
-        headers.set(X_ATT_DR_META, metaData.toString());
+        httpPut.addHeader(X_ATT_DR_META, metaData.toString());
+        httpPut.setURI(expectedUri);
 
         String plainCreds = "dradmin" + ":" + "dradmin";
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
         byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
         String base64Creds = new String(base64CredsBytes);
-        headers.add("Authorization", "Basic " + base64Creds);
+        httpPut.addHeader("Authorization", "Basic " + base64Creds);
 
         fileStream.reset();
+        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+                .expectNext(responseMock.toString()).verifyComplete();
 
-        HttpEntity<byte[]> requestEntity = new HttpEntity<>(IOUtils.toByteArray(fileStream), headers);
         verify(fileSystemResourceMock).setPath("target/" + FILE_NAME);
-        verify(restTemplateMock).exchange(expectedUri, HttpMethod.PUT, requestEntity, String.class);
-        verifyNoMoreInteractions(restTemplateMock);
+        InputStream fileInputStream = fileSystemResourceMock.getInputStream();
+        httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+    }
+
+    @Test
+    void getHttpResponse_Fail() throws Exception {
+        mockWebClientDependantObject(false);
+        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+                .verifyComplete();
     }
 
-    private void mockWebClientDependantObject() throws IOException {
+    private void mockWebClientDependantObject(boolean success)
+            throws IOException, InterruptedException, ExecutionException {
         fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
         when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
-
-        when(restTemplateMock.exchange(any(), any(), any(), any())).thenReturn(responseEntityMock);
-        when(responseEntityMock.getStatusCode()).thenReturn(HttpStatus.OK);
+        if (success) {
+            when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+            when(futureMock.get()).thenReturn(responseMock);
+            when(responseMock.getStatusLine()).thenReturn(statusLine);
+            when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
+        } else {
+            when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+            when(futureMock.get()).thenThrow(new InterruptedException());
+        }
     }
 }
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java
deleted file mode 100644 (file)
index b0f5c93..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-
-import org.junit.jupiter.api.Test;
-import org.springframework.http.HttpRequest;
-import org.springframework.http.client.ClientHttpRequestExecution;
-import org.springframework.http.client.ClientHttpResponse;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-class RequestResponseLoggingInterceptorTest {
-
-    @Test
-    void intercept_shouldReturnObject() throws URISyntaxException, IOException {
-
-        //given
-        RequestResponseLoggingInterceptor requestResponseLoggingInterceptor = new RequestResponseLoggingInterceptor();
-
-        ClientHttpRequestExecution execution = mock(ClientHttpRequestExecution.class);
-        HttpRequest request = mock(HttpRequest.class);
-        ClientHttpResponse response = mock(ClientHttpResponse.class);
-
-        byte[] BODY = new byte[] { (byte)0xe0, 0x4f, (byte)0xd0, 0x20, (byte)0xa2 };
-        URI uri = new URI("www.someuri.com");
-
-        //when
-        when(execution.execute(request, BODY)).thenReturn(response);
-
-        //then
-        assertNotNull(requestResponseLoggingInterceptor.intercept(request, BODY, execution));
-    }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java
deleted file mode 100644 (file)
index 3a0701f..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-import org.junit.jupiter.api.Test;
-
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-class RestTemplateWrapperTest {
-
-    @Test
-    void constructor_shouldReturnNotNullObject() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
-        RestTemplateWrapper restTemplateWrapper = new RestTemplateWrapper();
-        assertNotNull(restTemplateWrapper);
-    }
-}
diff --git a/pom.xml b/pom.xml
index 5bd941f..03bd074 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,7 @@
   <properties>
     <java.version>8</java.version>
     <immutable.version>2.7.1</immutable.version>
-    <spring.version>5.1.0.RELEASE</spring.version>
+    <spring.version>5.1.2.RELEASE</spring.version>
     <spring-boot.version>2.1.0.M4</spring-boot.version>
     <tomcat.version>8.5.32</tomcat.version>
     <docker.maven.version>1.0.0</docker.maven.version>
 
  <dependencyManagement>
     <dependencies>
+      <dependency>
+        <groupId>org.asynchttpclient</groupId>
+        <artifactId>async-http-client</artifactId>
+        <version>2.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpasyncclient</artifactId>
+        <version>4.1.4</version>
+      </dependency>
       <dependency>
         <groupId>io.projectreactor</groupId>
         <artifactId>reactor-bom</artifactId>