Handle redirect from DataRouter properly 89/69589/5
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 1 Oct 2018 14:51:05 +0000 (16:51 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 2 Oct 2018 08:38:30 +0000 (10:38 +0200)
Change-Id: I1eae8b45a2437b97bccedcb0f5cc02ac29f5044f
Issue-ID: DCAEGEN2-850
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
17 files changed:
datafile-app-server/config/application.yaml
datafile-app-server/config/datafile_endpoints.json
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/main/resources/datafile_endpoints.json
datafile-app-server/src/test/resources/datafile_endpoints.json
datafile-dmaap-client/pom.xml
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/FileSystemResourceWrapper.java [new file with mode: 0644]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IFileSystemResource.java [new file with mode: 0644]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IRestTemplate.java [new file with mode: 0644]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/PublishRedirectStrategy.java [new file with mode: 0644]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RequestResponseLoggingInterceptor.java [new file with mode: 0644]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RestTemplateWrapper.java [new file with mode: 0644]
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java

index ca3160c..cef185c 100644 (file)
@@ -14,7 +14,7 @@ logging:
     ROOT: ERROR
     org.springframework: ERROR
     org.springframework.data: ERROR
-    org.onap.dcaegen2.collectors.datafile: INFO
+    org.onap.dcaegen2.collectors.datafile: ERROR
   file: opt/log/application.log
 app:
   filepath: config/datafile_endpoints.json
index 102537b..5664bde 100644 (file)
@@ -3,24 +3,24 @@
         "dmaap": {
             "dmaapConsumerConfiguration": {
                 "dmaapHostName": "localhost",
-                "dmaapPortNumber": 3904,
+                "dmaapPortNumber": 2222,
                 "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
                 "dmaapProtocol": "http",
                 "dmaapUserName": "admin",
                 "dmaapUserPassword": "admin",
                 "dmaapContentType": "application/json",
-                "consumerId": "c12",
+                "consumerId": "C12",
                 "consumerGroup": "OpenDcae-c12",
                 "timeoutMS": -1,
                 "messageLimit": 1
             },
             "dmaapProducerConfiguration": {
                 "dmaapHostName": "localhost",
-                "dmaapPortNumber": 3905,
+                "dmaapPortNumber": 3907,
                 "dmaapTopicName": "publish",
-                "dmaapProtocol": "http",
-                "dmaapUserName": "admin",
-                "dmaapUserPassword": "admin",
+                "dmaapProtocol": "https",
+                "dmaapUserName": "dradmin",
+                "dmaapUserPassword": "dradmin",
                 "dmaapContentType": "application/octet-stream"
             }
         }
index 0b81df5..8508cd1 100644 (file)
@@ -18,9 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
 
 import reactor.core.publisher.Flux;
 
@@ -30,15 +28,9 @@ import reactor.core.publisher.Flux;
  */
 abstract class DmaapPublisherTask {
 
-    abstract Flux<String> publish(ConsumerDmaapModel consumerDmaapModel);
-
-    abstract DmaapProducerReactiveHttpClient resolveClient();
-
     protected abstract DmaapPublisherConfiguration resolveConfiguration();
 
-    protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
+    protected abstract DmaapProducerReactiveHttpClient resolveClient();
 
-    WebClient buildWebClient() {
-        return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
-    }
+    protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
 }
index b4ee3a9..201b33d 100644 (file)
@@ -37,24 +37,17 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
 
     private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
     private final Config datafileAppConfig;
-    private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
 
     @Autowired
     public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) {
         this.datafileAppConfig = datafileAppConfig;
     }
 
-    @Override
-    public Flux<String> publish(ConsumerDmaapModel consumerDmaapModel) {
-        logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel);
-        return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
-    }
-
     @Override
     public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
-        dmaapProducerReactiveHttpClient = resolveClient();
         logger.trace("Method called with arg {}", consumerDmaapModel);
-        return publish(consumerDmaapModel);
+        DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+        return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
     }
 
     @Override
@@ -63,8 +56,8 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
     }
 
     @Override
-    DmaapProducerReactiveHttpClient resolveClient() {
-        return new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient());
+    protected DmaapProducerReactiveHttpClient resolveClient() {
+        return new DmaapProducerReactiveHttpClient(resolveConfiguration());
     }
 
 }
index c263c95..171dd02 100644 (file)
@@ -61,7 +61,7 @@ public class ScheduledTasks {
         logger.trace("Execution of tasks was registered");
 
         consumeFromDmaapMessage()
-                .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP"))
+                .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
                 .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
                 .subscribe(this::onSuccess, this::onError, this::onComplete);
     }
index 188129e..dff77d2 100644 (file)
@@ -3,10 +3,10 @@
         "dmaap": {
             "dmaapConsumerConfiguration": {
                 "consumerGroup": "notification",
-                "consumerId": "1",
+                "consumerId": "C12",
                 "dmaapContentType": "application/json",
                 "dmaapHostName": "localhost",
-                "dmaapPortNumber": 3904,
+                "dmaapPortNumber": 2222,
                 "dmaapProtocol": "http",
                 "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
                 "dmaapUserName": "admin",
             "dmaapProducerConfiguration": {
                 "dmaapContentType": "application/octet-stream",
                 "dmaapHostName": "localhost",
-                "dmaapPortNumber": 3905,
-                "dmaapProtocol": "http",
+                "dmaapPortNumber": 3907,
+                "dmaapProtocol": "https",
                 "dmaapTopicName": "publish",
-                "dmaapUserName": "admin",
-                "dmaapUserPassword": "admin"
+                "dmaapUserName": "dradmin",
+                "dmaapUserPassword": "dradmin"
             }
         }
     }
index 188129e..dff77d2 100644 (file)
@@ -3,10 +3,10 @@
         "dmaap": {
             "dmaapConsumerConfiguration": {
                 "consumerGroup": "notification",
-                "consumerId": "1",
+                "consumerId": "C12",
                 "dmaapContentType": "application/json",
                 "dmaapHostName": "localhost",
-                "dmaapPortNumber": 3904,
+                "dmaapPortNumber": 2222,
                 "dmaapProtocol": "http",
                 "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
                 "dmaapUserName": "admin",
             "dmaapProducerConfiguration": {
                 "dmaapContentType": "application/octet-stream",
                 "dmaapHostName": "localhost",
-                "dmaapPortNumber": 3905,
-                "dmaapProtocol": "http",
+                "dmaapPortNumber": 3907,
+                "dmaapProtocol": "https",
                 "dmaapTopicName": "publish",
-                "dmaapUserName": "admin",
-                "dmaapUserPassword": "admin"
+                "dmaapUserName": "dradmin",
+                "dmaapUserPassword": "dradmin"
             }
         }
     }
index 96e0988..9f60f8d 100644 (file)
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-reactor-netty</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-api</artifactId>
index d5878b0..095ba8c 100644 (file)
@@ -18,8 +18,6 @@
 
 package org.onap.dcaegen2.collectors.datafile.service;
 
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
 import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +35,6 @@ public class DmaapReactiveWebClient {
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     private String dmaaPContentType;
-    private String dmaaPUserName;
-    private String dmaaPUserPassword;
 
     /**
      * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
@@ -47,8 +43,6 @@ public class DmaapReactiveWebClient {
      * @return DmaapReactiveWebClient
      */
     public DmaapReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
-        this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
-        this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
         this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
         return this;
     }
@@ -61,7 +55,6 @@ public class DmaapReactiveWebClient {
     public WebClient build() {
         return WebClient.builder()
             .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
-            .filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword))
             .filter(logRequest())
             .filter(logResponse())
             .build();
index 36050ff..fae86a8 100644 (file)
@@ -19,26 +19,29 @@ package org.onap.dcaegen2.collectors.datafile.service.producer;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
 
-import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
 
-import org.apache.http.HttpHeaders;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.core.io.FileSystemResource;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 7/4/18
@@ -49,16 +52,21 @@ public class DmaapProducerReactiveHttpClient {
     private static final String X_ATT_DR_META = "X-ATT-DR-META";
     private static final String NAME_JSON_TAG = "name";
     private static final String LOCATION_JSON_TAG = "location";
+    private static final String URI_SEPARATOR = "/";
     private static final String DEFAULT_FEED_ID = "1";
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private WebClient webClient;
     private final String dmaapHostName;
     private final Integer dmaapPortNumber;
     private final String dmaapTopicName;
     private final String dmaapProtocol;
     private final String dmaapContentType;
+    private final String user;
+    private final String pwd;
+
+    private IFileSystemResource fileResource;
+    private IRestTemplate restTemplate;
 
     /**
      * Constructor DmaapProducerReactiveHttpClient.
@@ -72,65 +80,86 @@ public class DmaapProducerReactiveHttpClient {
         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
         this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
         this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
+        this.user = dmaapPublisherConfiguration.dmaapUserName();
+        this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
     }
 
     /**
-     * Function for calling DMaaP HTTP producer - post request to DMaaP.
+     * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter.
      *
-     * @param consumerDmaapModel - object which will be sent to DMaaP
+     * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
      * @return status code of operation
      */
     public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
         logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
+        try {
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.parseMediaType(dmaapContentType));
+            addMetaDataToHead(consumerDmaapModel, headers);
 
-        RequestBodyUriSpec post = webClient.post();
+            addUserCredentialsToHead(headers);
 
-        prepareHead(consumerDmaapModel, post);
+            HttpEntity<byte[]> request = addFileToRequest(consumerDmaapModel, headers);
 
-        prepareBody(consumerDmaapModel, post);
 
-        ResponseSpec responseSpec = post.retrieve();
-        responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
-        responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
-        Flux<String> response = responseSpec.bodyToFlux(String.class);
+            logger.trace("Starting to publish to DR");
+            ResponseEntity<String> responseEntity = getRestTemplate().exchange(getUri(consumerDmaapModel.getName()),
+                    HttpMethod.PUT, request, String.class);
 
-        logger.trace("Exiting getDmaapProducerResponse with {}", response);
-        return response;
+            return Flux.just(responseEntity.getStatusCode().toString());
+        } catch (Exception e) {
+            logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
+            return Flux.empty();
+        }
     }
 
-    public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
-        this.webClient = webClient;
-        return this;
+    private void addUserCredentialsToHead(HttpHeaders headers) {
+        String plainCreds = user + ":" + pwd;
+        byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+        byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+        String base64Creds = new String(base64CredsBytes);
+        logger.trace("base64Creds...: {}", base64Creds);
+        headers.add("Authorization", "Basic " + base64Creds);
     }
 
-    private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
-        post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
-
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
-        String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+    private void addMetaDataToHead(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers) {
+        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
+        metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
-        post.header(X_ATT_DR_META, metaData.toString());
+        headers.set(X_ATT_DR_META, metaData.toString());
+    }
 
-        post.uri(getUri(name));
+    private HttpEntity<byte[]> addFileToRequest(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers)
+            throws IOException {
+        InputStream in = getInputStream(consumerDmaapModel.getLocation());
+        return new HttpEntity<>(IOUtils.toByteArray(in), headers);
     }
 
-    private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
-        String fileLocation = model.getLocation();
-        File fileResource = new File(fileLocation);
-        FileSystemResource httpResource = new FileSystemResource(fileResource);
-        post.body(BodyInserters.fromResource(httpResource));
+    private InputStream getInputStream(String filePath) throws IOException {
+        if (fileResource == null) {
+            fileResource = new FileSystemResourceWrapper(filePath);
+        }
+        return fileResource.getInputStream();
+    }
+
+    private IRestTemplate getRestTemplate() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException {
+        if (restTemplate == null) {
+            restTemplate = new RestTemplateWrapper();
+        }
+        return restTemplate;
     }
 
     private URI getUri(String fileName) {
-        String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+        String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName;
         return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
                 .path(path).build();
     }
 
-    private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
-        String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
-        logger.error(errorMessage);
+    protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
+        fileResource = fileSystemResource;
+    }
 
-        return Mono.error(new Exception(errorMessage));
+    protected void setRestTemplate(IRestTemplate restTemplate) {
+        this.restTemplate = restTemplate;
     }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/FileSystemResourceWrapper.java
new file mode 100644 (file)
index 0000000..6349691
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.springframework.core.io.FileSystemResource;
+
+/**
+ * @author
+ *
+ */
+public class FileSystemResourceWrapper implements IFileSystemResource {
+    private FileSystemResource realResource;
+
+    public FileSystemResourceWrapper(String path) {
+        realResource = new FileSystemResource(path);
+    }
+    @Override
+    public InputStream getInputStream() throws IOException {
+        return realResource.getInputStream();
+    }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IFileSystemResource.java
new file mode 100644 (file)
index 0000000..6ecb1c6
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * @author
+ *
+ */
+public interface IFileSystemResource {
+
+    InputStream getInputStream() throws IOException;
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IRestTemplate.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IRestTemplate.java
new file mode 100644 (file)
index 0000000..7f8d3b5
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.net.URI;
+
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+
+/**
+ * @author
+ *
+ */
+public interface IRestTemplate {
+    public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity,
+            Class<String> responseType);
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/PublishRedirectStrategy.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/PublishRedirectStrategy.java
new file mode 100644 (file)
index 0000000..ff3a03b
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.net.URI;
+
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolException;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PublishRedirectStrategy implementation
+ * that automatically redirects all HEAD, GET, POST, PUT, and DELETE requests.
+ * This strategy relaxes restrictions on automatic redirection of
+ * POST methods imposed by the HTTP specification.
+ *
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public class PublishRedirectStrategy extends DefaultRedirectStrategy {
+
+    public static final PublishRedirectStrategy INSTANCE = new PublishRedirectStrategy();
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /**
+     * Redirectable methods.
+     */
+    private static final String[] REDIRECT_METHODS = new String[] {
+        HttpPut.METHOD_NAME,
+        HttpGet.METHOD_NAME,
+        HttpPost.METHOD_NAME,
+        HttpHead.METHOD_NAME,
+        HttpDelete.METHOD_NAME
+    };
+
+    @Override
+    protected boolean isRedirectable(final String method) {
+        for (final String m: REDIRECT_METHODS) {
+            if (m.equalsIgnoreCase(method)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public HttpUriRequest getRedirect(
+            final HttpRequest request,
+            final HttpResponse response,
+            final HttpContext context) throws ProtocolException {
+        final URI uri = getLocationURI(request, response, context);
+        logger.trace("getRedirect...: {}", request.toString());
+        return RequestBuilder.copy(request).setUri(uri).build();
+    }
+
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RequestResponseLoggingInterceptor.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RequestResponseLoggingInterceptor.java
new file mode 100644 (file)
index 0000000..bf40700
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpRequest;
+import org.springframework.http.client.ClientHttpRequestExecution;
+import org.springframework.http.client.ClientHttpRequestInterceptor;
+import org.springframework.http.client.ClientHttpResponse;
+import org.springframework.util.StreamUtils;
+
+public class RequestResponseLoggingInterceptor implements ClientHttpRequestInterceptor {
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Override
+    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
+        logRequest(request, body);
+        ClientHttpResponse response = execution.execute(request, body);
+        logResponse(response);
+        return response;
+    }
+
+    private void logRequest(HttpRequest request, byte[] body) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("===========================request begin================================================");
+            log.debug("URI         : {}", request.getURI());
+            log.debug("Method      : {}", request.getMethod());
+            log.debug("Headers     : {}", request.getHeaders());
+            log.debug("Request body: {}", new String(body, "UTF-8"));
+            log.debug("==========================request end================================================");
+        }
+    }
+
+    private void logResponse(ClientHttpResponse response) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("============================response begin==========================================");
+            log.debug("Status code  : {}", response.getStatusCode());
+            log.debug("Status text  : {}", response.getStatusText());
+            log.debug("Headers      : {}", response.getHeaders());
+            log.debug("Response body: {}", StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()));
+            log.debug("=======================response end=================================================");
+        }
+    }
+}
\ No newline at end of file
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RestTemplateWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RestTemplateWrapper.java
new file mode 100644 (file)
index 0000000..298f669
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.net.URI;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * @author
+ *
+ */
+public class RestTemplateWrapper implements IRestTemplate {
+    private RestTemplate restTemplate;
+
+    public RestTemplateWrapper() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+        SSLContext sslContext =
+                new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+        CloseableHttpClient httpClient =
+                HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
+                        .setRedirectStrategy(new PublishRedirectStrategy()).build();
+
+        HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
+        requestFactory.setHttpClient(httpClient);
+
+        restTemplate = new RestTemplate(requestFactory);
+        restTemplate.setInterceptors(Collections.singletonList(new RequestResponseLoggingInterceptor()));
+
+    }
+
+    @Override
+    public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity,
+            Class<String> responseType) {
+        return restTemplate.exchange(url,  method, requestEntity, responseType);
+    }
+
+}
index 5f4c1a5..5dbc908 100644 (file)
 package org.onap.dcaegen2.collectors.datafile.service.producer;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModelForUnitTest;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
-import reactor.core.publisher.Flux;
 import reactor.test.StepVerifier;
 
 /**
@@ -57,70 +51,59 @@ class DmaapProducerReactiveHttpClientTest {
     private static final String X_ATT_DR_META = "X-ATT-DR-META";
 
     private static final String HOST = "54.45.33.2";
-    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
     private static final int PORT = 1234;
     private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+    private static final String URI_SEPARATOR = "/";
     private static final String PUBLISH_TOPIC = "publish";
     private static final String DEFAULT_FEED_ID = "1";
+    private static final String FILE_CONTENT = "Just a string.";
 
     private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
 
     private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
     private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
-    private WebClient webClientMock = mock(WebClient.class);
-    private RequestBodyUriSpec requestBodyUriSpecMock;
-    private ResponseSpec responseSpecMock;
+
+    private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
+    private IRestTemplate restTemplateMock = mock(IRestTemplate.class);
+    private ResponseEntity<String> responseEntityMock = mock(ResponseEntity.class);
 
 
     @BeforeEach
     void setUp() {
         when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
-        when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTP_SCHEME);
+        when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
         when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
-        when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("DATAFILE");
-        when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("DATAFILE");
+        when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
+        when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
         when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
         when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC);
 
         dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
-
-        webClientMock = spy(WebClient.builder()
-                .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
-                .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
-                        dmaapPublisherConfigurationMock.dmaapUserPassword()))
-                .build());
-        requestBodyUriSpecMock = mock(RequestBodyUriSpec.class);
-        responseSpecMock = mock(ResponseSpec.class);
+        dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
+        dmaapProducerReactiveHttpClient.setRestTemplate(restTemplateMock);
     }
 
     @Test
-    void getHttpResponse_Success() {
+    void getHttpResponse_Success() throws Exception {
         mockWebClientDependantObject();
-        dmaapProducerReactiveHttpClient.createDmaapWebClient(webClientMock);
-        List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>();
-        consumerDmaapModelList.add(consumerDmaapModel);
 
         StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
-                .expectNext("200").verifyComplete();
-
-        verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
-        metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
-        metaData.getAsJsonObject().remove(NAME_JSON_TAG);
-        verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString());
-        URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT)
-                .path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build();
-        verify(requestBodyUriSpecMock).uri(expectedUri);
-        verify(requestBodyUriSpecMock).body(any());
+                .expectNext(HttpStatus.OK.toString()).verifyComplete();
+
+        URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
+                .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
+
+        verify(restTemplateMock)
+                .exchange(eq(expectedUri), eq(HttpMethod.PUT), any(), eq(String.class));
     }
 
-    private void mockWebClientDependantObject() {
-        when(webClientMock.post()).thenReturn(requestBodyUriSpecMock);
-        when(requestBodyUriSpecMock.uri((URI) any())).thenReturn(requestBodyUriSpecMock);
+    private void mockWebClientDependantObject() throws IOException {
+        InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
+        when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
 
-        when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock);
-        when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock);
-        Flux<String> expectedResult = Flux.just("200");
-        when(responseSpecMock.bodyToFlux(String.class)).thenReturn(expectedResult);
+        when(restTemplateMock.exchange(any(), any(), any(), any()))
+                .thenReturn(responseEntityMock);
+        when(responseEntityMock.getStatusCode()).thenReturn(HttpStatus.OK);
     }
 }