Remove datarouter-node critical code smells
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
index 7e10d5a..018c3af 100644 (file)
@@ -27,8 +27,15 @@ package org.onap.dmaap.datarouter.node;
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.zip.GZIPInputStream;
 
-import org.apache.log4j.Logger;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
+import org.slf4j.MDC;
+
+import static com.att.eelf.configuration.Configuration.*;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
 
 /**
  * A file to be delivered to a destination.
@@ -38,10 +45,11 @@ import org.apache.log4j.Logger;
  * the file and its delivery data as well as to attempt delivery.
  */
 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
-    private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
-    private DeliveryTaskHelper dth;
+    private static EELFLogger eelfLogger = EELFManager.getInstance()
+            .getLogger(DeliveryTask.class);
+    private DeliveryTaskHelper deliveryTaskHelper;
     private String pubid;
-    private DestInfo di;
+    private DestInfo destInfo;
     private String spool;
     private File datafile;
     private File metafile;
@@ -54,36 +62,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     private String feedid;
     private String subid;
     private int attempts;
+    private boolean followRedirects;
     private String[][] hdrs;
+    private String newInvocationId;
+    private long resumeTime;
 
 
     /**
      * Create a delivery task for a given delivery queue and pub ID
      *
-     * @param    dth    The delivery task helper for the queue this task is in.
-     * @param    pubid    The publish ID for this file.  This is used as
-     * the base for the file name in the spool directory and is of
-     * the form <milliseconds since 1970>.<fqdn of initial data router node>
+     * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
+     * @param pubid              The publish ID for this file.  This is used as
+     *                           the base for the file name in the spool directory and is of
+     *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
      */
-    public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
-        this.dth = dth;
+    DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+        this.deliveryTaskHelper = deliveryTaskHelper;
         this.pubid = pubid;
-        di = dth.getDestInfo();
-        subid = di.getSubId();
-        feedid = di.getLogData();
-        spool = di.getSpool();
+        destInfo = deliveryTaskHelper.getDestinationInfo();
+        subid = destInfo.getSubId();
+        this.followRedirects = destInfo.isFollowRedirects();
+        feedid = destInfo.getLogData();
+        spool = destInfo.getSpool();
         String dfn = spool + "/" + pubid;
         String mfn = dfn + ".M";
         datafile = new File(spool + "/" + pubid);
         metafile = new File(mfn);
-        boolean monly = di.isMetaDataOnly();
+        boolean monly = destInfo.isMetaDataOnly();
         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
-        Vector<String[]> hdrv = new Vector<String[]>();
+        resumeTime = System.currentTimeMillis();
+        Vector<String[]> hdrv = new Vector<>();
 
         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
             String s = br.readLine();
             int i = s.indexOf('\t');
             method = s.substring(0, i);
+            NodeUtils.setIpAndFqdnForEelf(method);
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
@@ -92,9 +106,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 i = s.indexOf('\t');
                 String h = s.substring(0, i);
                 String v = s.substring(i + 1);
-                if ("x-att-dr-routing".equalsIgnoreCase(h)) {
+                if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
                     subid = v.replaceAll("[^ ]*/", "");
-                    feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+                    feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
                 }
                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
                     continue;
@@ -102,14 +116,23 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 if (h.equalsIgnoreCase("content-type")) {
                     ctype = v;
                 }
+                if (h.equalsIgnoreCase("x-onap-requestid")) {
+                    MDC.put(MDC_KEY_REQUEST_ID, v);
+                }
+                if (h.equalsIgnoreCase("x-invocationid")) {
+                    MDC.put("InvocationId", v);
+                    v = UUID.randomUUID().toString();
+                    newInvocationId = v;
+                }
                 hdrv.add(new String[]{h, v});
             }
         } catch (Exception e) {
-            loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+            eelfLogger.error("Exception", e);
         }
         hdrs = hdrv.toArray(new String[hdrv.size()][]);
-        url = dth.getDestURL(fileid);
+        url = deliveryTaskHelper.getDestURL(fileid);
     }
+
     /**
      * Is the object a DeliveryTask with the same publication ID?
      */
@@ -140,10 +163,11 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     public String toString() {
         return (pubid);
     }
+
     /**
      * Get the publish ID
      */
-    public String getPublishId() {
+    String getPublishId() {
         return (pubid);
     }
 
@@ -153,14 +177,17 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     public void run() {
         attempts++;
         try {
-            di = dth.getDestInfo();
-            boolean expect100 = di.isUsing100();
-            boolean monly = di.isMetaDataOnly();
+            destInfo = deliveryTaskHelper.getDestinationInfo();
+            boolean expect100 = destInfo.isUsing100();
+            boolean monly = destInfo.isMetaDataOnly();
             length = 0;
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
-            url = dth.getDestURL(fileid);
+            if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
+                    fileid = fileid.replace(".gz", "");
+            }
+            url = deliveryTaskHelper.getDestURL(fileid);
             URL u = new URL(url);
             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
             uc.setConnectTimeout(60000);
@@ -168,8 +195,8 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             uc.setInstanceFollowRedirects(false);
             uc.setRequestMethod(method);
             uc.setRequestProperty("Content-Length", Long.toString(length));
-            uc.setRequestProperty("Authorization", di.getAuth());
-            uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
+            uc.setRequestProperty("Authorization", destInfo.getAuth());
+            uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
             for (String[] nv : hdrs) {
                 uc.addRequestProperty(nv[0], nv[1]);
             }
@@ -177,37 +204,16 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 if (expect100) {
                     uc.setRequestProperty("Expect", "100-continue");
                 }
-                uc.setFixedLengthStreamingMode(length);
                 uc.setDoOutput(true);
-                OutputStream os = null;
-                try {
-                    os = uc.getOutputStream();
-                } catch (ProtocolException pe) {
-                    dth.reportDeliveryExtra(this, -1L);
-                    // Rcvd error instead of 100-continue
-                    loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
-                }
-                if (os != null) {
-                    long sofar = 0;
-                    try (InputStream is = new FileInputStream(datafile)) {
-                        byte[] buf = new byte[1024 * 1024];
-                        while (sofar < length) {
-                            int i = buf.length;
-                            if (sofar + i > length) {
-                                i = (int) (length - sofar);
-                            }
-                            i = is.read(buf, 0, i);
-                            if (i <= 0) {
-                                throw new IOException("Unexpected problem reading data file " + datafile);
-                            }
-                            sofar += i;
-                            os.write(buf, 0, i);
-                        }
-                        os.close();
-                    } catch (IOException ioe) {
-                        dth.reportDeliveryExtra(this, sofar);
-                        throw ioe;
+                if (destInfo.isDecompress()) {
+                    if (isFiletypeGzip(datafile)) {
+                        sendDecompressedFile(uc);
+                    } else {
+                        uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
+                        sendFile(uc);
                     }
+                } else {
+                    sendFile(uc);
                 }
             }
             int rc = uc.getResponseCode();
@@ -226,7 +232,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             InputStream is;
             if (rc >= 200 && rc <= 299) {
                 is = uc.getInputStream();
-                xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
+                xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
             } else {
                 if (rc >= 300 && rc <= 399) {
                     rmsg = uc.getHeaderField("Location");
@@ -239,26 +245,121 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 }
                 is.close();
             }
-            dth.reportStatus(this, rc, xpubid, rmsg);
+            deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
         } catch (Exception e) {
-            loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
-            dth.reportException(this, e);
+            eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e);
+            deliveryTaskHelper.reportException(this, e);
+        }
+    }
+
+    /**
+     * To send decompressed gzip to the subscribers
+     *
+     * @param httpURLConnection connection used to make request
+     * @throws IOException
+     */
+    private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
+        byte[] buffer = new byte[8164];
+        httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+        OutputStream outputStream = getOutputStream(httpURLConnection);
+        if (outputStream != null) {
+            int bytesRead = 0;
+            try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
+                int bufferLength = buffer.length;
+                while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
+                    outputStream.write(buffer, 0, bytesRead);
+                }
+                outputStream.close();
+            } catch (IOException e) {
+                httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+                eelfLogger.info("Could not decompress file", e);
+                sendFile(httpURLConnection);
+            }
+
+        }
+    }
+
+    /**
+     * To send any file to the subscriber.
+     *
+     * @param httpURLConnection connection used to make request
+     * @throws IOException
+     */
+    private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
+        OutputStream os = getOutputStream(httpURLConnection);
+        if (os != null) {
+            long sofar = 0;
+            try (InputStream is = new FileInputStream(datafile)) {
+                byte[] buf = new byte[1024 * 1024];
+                while (sofar < length) {
+                    int i = buf.length;
+                    if (sofar + i > length) {
+                        i = (int) (length - sofar);
+                    }
+                    i = is.read(buf, 0, i);
+                    if (i <= 0) {
+                        throw new IOException("Unexpected problem reading data file " + datafile);
+                    }
+                    sofar += i;
+                    os.write(buf, 0, i);
+                }
+                os.close();
+            } catch (IOException ioe) {
+                deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+                throw ioe;
+            }
+        }
+    }
+
+    /**
+     * Get the outputstream that will be used to send data
+     *
+     * @param httpURLConnection connection used to make request
+     * @return AN Outpustream that can be used to send your data.
+     * @throws IOException
+     */
+    private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
+        OutputStream outputStream = null;
+
+        try {
+            outputStream = httpURLConnection.getOutputStream();
+        } catch (ProtocolException pe) {
+            deliveryTaskHelper.reportDeliveryExtra(this, -1L);
+            // Rcvd error instead of 100-continue
+            eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
         }
+        return outputStream;
     }
 
     /**
      * Remove meta and data files
      */
-    public void clean() {
+    void clean() {
         datafile.delete();
         metafile.delete();
+        eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
+        eelfLogger.info(EelfMsgs.EXIT);
         hdrs = null;
     }
 
+    /**
+     * Set the resume time for a delivery task.
+     */
+    void setResumeTime(long resumeTime) {
+        this.resumeTime = resumeTime;
+    }
+
+    /**
+     * Get the resume time for a delivery task.
+     */
+    long getResumeTime() {
+        return resumeTime;
+    }
+
     /**
      * Has this delivery task been cleaned?
      */
-    public boolean isCleaned() {
+    boolean isCleaned() {
         return (hdrs == null);
     }
 
@@ -272,7 +373,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Get creation date as encoded in the publish ID.
      */
-    public long getDate() {
+    long getDate() {
         return (date);
     }
 
@@ -286,42 +387,49 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Get the content type
      */
-    public String getCType() {
+    String getCType() {
         return (ctype);
     }
 
     /**
      * Get the method
      */
-    public String getMethod() {
+    String getMethod() {
         return (method);
     }
 
     /**
      * Get the file ID
      */
-    public String getFileId() {
+    String getFileId() {
         return (fileid);
     }
 
     /**
      * Get the number of delivery attempts
      */
-    public int getAttempts() {
+    int getAttempts() {
         return (attempts);
     }
 
     /**
      * Get the (space delimited list of) subscription ID for this delivery task
      */
-    public String getSubId() {
+    String getSubId() {
         return (subid);
     }
 
     /**
      * Get the feed ID for this delivery task
      */
-    public String getFeedId() {
+    String getFeedId() {
         return (feedid);
     }
+
+    /**
+     * Get the followRedirects for this delivery task
+     */
+    public boolean getFollowRedirects() {
+        return(followRedirects);
+    }
 }