Restapi-call-node: Fix sending big files to DMAAP data router
[ccsdk/sli/plugins.git] / restapi-call-node / provider / src / main / java / org / onap / ccsdk / sli / plugins / restapicall / RestapiCallNode.java
index 8038b94..709774b 100755 (executable)
@@ -25,15 +25,22 @@ package org.onap.ccsdk.sli.plugins.restapicall;
 import static java.lang.Boolean.valueOf;
 import static javax.ws.rs.client.Entity.entity;
 import static org.onap.ccsdk.sli.plugins.restapicall.AuthType.fromString;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
 import java.net.SocketException;
 import java.net.URI;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.KeyStore;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -72,8 +79,13 @@ import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
 import org.onap.ccsdk.sli.core.sli.SvcLogicJavaPlugin;
+import org.onap.logging.filter.base.HttpURLConnectionMetricUtil;
+import org.onap.logging.filter.base.MetricLogClientFilter;
+import org.onap.logging.filter.base.ONAPComponents;
+import org.onap.logging.ref.slf4j.ONAPLogConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 public class RestapiCallNode implements SvcLogicJavaPlugin {
 
@@ -125,6 +137,7 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
         httpReadTimeout = readOptionalInteger("HTTP_READ_TIMEOUT_MS",DEFAULT_HTTP_READ_TIMEOUT_MS);
     }
 
+    @SuppressWarnings("unchecked")
     protected void loadPartners(JSONObject partners) {
         Iterator<String> keys = partners.keys();
         String partnerUserKey = "user";
@@ -222,6 +235,7 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
         p.accept = parseParam(paramMap, "accept", false, null);
         p.multipartFormData = valueOf(parseParam(paramMap, "multipartFormData", false, "false"));
         p.multipartFile = parseParam(paramMap, "multipartFile", false, null);
+        p.targetEntity = parseParam(paramMap, "targetEntity", false, null);
         return p;
     }
 
@@ -462,10 +476,13 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
     protected void sendRequest(Map<String, String> paramMap, SvcLogicContext ctx, RetryPolicy retryPolicy)
         throws SvcLogicException {
 
-        HttpResponse r = new HttpResponse();
+       HttpResponse r = new HttpResponse();
         try {
             handlePartner(paramMap);
             Parameters p = getParameters(paramMap, new Parameters());
+            if(p.targetEntity != null && !p.targetEntity.isEmpty()) {
+                MDC.put(ONAPLogConstants.MDCs.TARGET_ENTITY, p.targetEntity);
+            }
             if (p.restapiUrl.contains(",") && retryPolicy == null) {
                 String[] urls = p.restapiUrl.split(",");
                 retryPolicy = new RetryPolicy(urls, urls.length * 2);
@@ -785,11 +802,9 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
         setClientTimeouts(client);
         // Needed to support additional HTTP methods such as PATCH
         client.property(HttpUrlConnectorProvider.SET_METHOD_WORKAROUND, true);
-
+        client.register(new MetricLogClientFilter());
         WebTarget webTarget = addAuthType(client, p).target(p.restapiUrl);
 
-        log.info("Sending request below to url " + p.restapiUrl);
-        log.info(request);
         long t1 = System.currentTimeMillis();
 
         HttpResponse r = new HttpResponse();
@@ -821,14 +836,20 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
                 }
             }
 
-            invocationBuilder.header("X-ECOMP-RequestID", org.slf4j.MDC.get("X-ECOMP-RequestID"));
-
             invocationBuilder.property(ClientProperties.SUPPRESS_HTTP_COMPLIANCE_VALIDATION, true);
 
             Response response;
 
             try {
-                response = invocationBuilder.method(p.httpMethod.toString(), entity(request, contentType));
+                // When the HTTP operation has no body do not set the content-type
+                //setting content-type has caused errors with some servers when no body is present
+                if (request == null) {
+                    response = invocationBuilder.method(p.httpMethod.toString());
+                } else {
+                    log.info("Sending request below to url " + p.restapiUrl);
+                    log.info(request);
+                    response = invocationBuilder.method(p.httpMethod.toString(), entity(request, contentType));
+                }
             } catch (ProcessingException | IllegalStateException e) {
                 throw new SvcLogicException(requestPostingException + e.getLocalizedMessage(), e);
             }
@@ -869,8 +890,6 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
                 }
             }
 
-            invocationBuilder.header("X-ECOMP-RequestID", org.slf4j.MDC.get("X-ECOMP-RequestID"));
-
             Response response;
 
             try {
@@ -947,6 +966,16 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
             byte[] data = Files.readAllBytes(Paths.get(p.fileName));
 
             r = sendHttpData(data, p);
+
+            for (int i = 0; i < 10 && r.code == 301; i++) {
+                String newUrl = r.headers2.get("Location").get(0);
+
+                log.info("Got response code 301. Sending same request to URL: " + newUrl);
+
+                p.url = newUrl;
+                r = sendHttpData(data, p);
+            }
+
             setResponseStatus(ctx, p.responsePrefix, r);
 
         } catch (SvcLogicException | IOException e) {
@@ -1021,12 +1050,27 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
         }
     }
 
-    protected HttpResponse sendHttpData(byte[] data, FileParam p) throws SvcLogicException {
+    protected HttpResponse sendHttpData(byte[] data, FileParam p) throws IOException {
+        URL url = new URL(p.url);
+        HttpURLConnection con = (HttpURLConnection) url.openConnection();
 
-        Client client = ClientBuilder.newBuilder().build();
-        setClientTimeouts(client);
-        client.property(ClientProperties.FOLLOW_REDIRECTS, true);
-        WebTarget webTarget = addAuthType(client, p).target(p.url);
+        log.info("Connection: " + con.getClass().getName());
+
+        con.setRequestMethod(p.httpMethod.toString());
+        con.setRequestProperty("Content-Type", "application/octet-stream");
+        con.setRequestProperty("Accept", "*/*");
+        con.setRequestProperty("Expect", "100-continue");
+        con.setFixedLengthStreamingMode(data.length);
+        con.setInstanceFollowRedirects(false);
+
+        if (p.user != null && p.password != null) {
+            String authString = p.user + ":" + p.password;
+            String authStringEnc = Base64.getEncoder().encodeToString(authString.getBytes());
+            con.setRequestProperty("Authorization", "Basic " + authStringEnc);
+        }
+
+        con.setDoInput(true);
+        con.setDoOutput(true);
 
         log.info("Sending file");
         long t1 = System.currentTimeMillis();
@@ -1035,69 +1079,46 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
         r.code = 200;
 
         if (!p.skipSending) {
-            String tt = "application/octet-stream";
-            Invocation.Builder invocationBuilder = webTarget.request(tt).accept(tt);
+            HttpURLConnectionMetricUtil util = new HttpURLConnectionMetricUtil();
+            util.logBefore(con, ONAPComponents.DMAAP);
 
-            Response response;
+            con.connect();
 
+            boolean continue100failed = false;
             try {
-                if (p.httpMethod == HttpMethod.POST) {
-                    response = invocationBuilder.post(Entity.entity(data, tt));
-                } else if (p.httpMethod == HttpMethod.PUT) {
-                    response = invocationBuilder.put(Entity.entity(data, tt));
-                } else {
-                    throw new SvcLogicException("Http operation" + p.httpMethod + "not supported");
-                }
-            } catch (ProcessingException e) {
-                throw new SvcLogicException(requestPostingException + e.getLocalizedMessage(), e);
+                OutputStream os = con.getOutputStream();
+                os.write(data);
+                os.flush();
+                os.close();
+            } catch (ProtocolException e) {
+                continue100failed = true;
             }
 
-            r.code = response.getStatus();
-            r.headers = response.getStringHeaders();
-            EntityTag etag = response.getEntityTag();
-            if (etag != null) {
-                r.message = etag.getValue();
-            }
-            if (response.hasEntity() && r.code != 204) {
-                r.body = response.readEntity(String.class);
-            }
-
-            if (r.code == 301) {
-                String newUrl = response.getStringHeaders().getFirst("Location");
+            r.code = con.getResponseCode();
+            r.headers2 = con.getHeaderFields();
 
-                log.info("Got response code 301. Sending same request to URL: {}", newUrl);
-
-                webTarget = client.target(newUrl);
-                invocationBuilder = webTarget.request(tt).accept(tt);
-
-                try {
-                    if (p.httpMethod == HttpMethod.POST) {
-                        response = invocationBuilder.post(Entity.entity(data, tt));
-                    } else if (p.httpMethod == HttpMethod.PUT) {
-                        response = invocationBuilder.put(Entity.entity(data, tt));
-                    } else {
-                        throw new SvcLogicException("Http operation" + p.httpMethod + "not supported");
-                    }
-                } catch (ProcessingException e) {
-                    throw new SvcLogicException(requestPostingException + e.getLocalizedMessage(), e);
+            if (r.code != 204 && !continue100failed) {
+                BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+                String inputLine;
+                StringBuffer response = new StringBuffer();
+                while ((inputLine = in.readLine()) != null) {
+                    response.append(inputLine);
                 }
+                in.close();
 
-                r.code = response.getStatus();
-                etag = response.getEntityTag();
-                if (etag != null) {
-                    r.message = etag.getValue();
-                }
-                if (response.hasEntity() && r.code != 204) {
-                    r.body = response.readEntity(String.class);
-                }
+                r.body = response.toString();
             }
+
+            util.logAfter(con);
+
+            con.disconnect();
         }
 
         long t2 = System.currentTimeMillis();
-        log.info(responseReceivedMessage, t2 - t1);
-        log.info(responseHttpCodeMessage, r.code);
+        log.info("Response received. Time: {}", t2 - t1);
+        log.info("HTTP response code: {}", r.code);
         log.info("HTTP response message: {}", r.message);
-        logHeaders(r.headers);
+        logHeaders(r.headers2);
         log.info("HTTP response: {}", r.body);
 
         return r;
@@ -1145,6 +1166,26 @@ public class RestapiCallNode implements SvcLogicJavaPlugin {
         }
     }
 
+    private void logHeaders(Map<String, List<String>> mm) {
+        if (mm == null || mm.isEmpty()) {
+            return;
+        }
+
+        List<String> ll = new ArrayList<>();
+        for (String s : mm.keySet()) {
+            if (s != null) {
+                ll.add(s);
+            }
+        }
+        Collections.sort(ll);
+
+        for (String name : ll) {
+            List<String> v = mm.get(name);
+            log.info("--- {}:{}", name, String.valueOf(mm.get(name)));
+            log.info("--- " + name + ": " + (v.size() == 1 ? v.get(0) : v));
+        }
+    }
+
     protected HttpResponse postOnUeb(String request, UebParam p) throws SvcLogicException {
         String[] urls = uebServers.split(" ");
         for (int i = 0; i < urls.length; i++) {