Remove datarouter-node critical code smells
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
index b2c3169..018c3af 100644 (file)
@@ -27,14 +27,15 @@ package org.onap.dmaap.datarouter.node;
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.zip.GZIPInputStream;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import org.apache.log4j.Logger;
 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
 import org.slf4j.MDC;
 
 import static com.att.eelf.configuration.Configuration.*;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
 
 /**
  * A file to be delivered to a destination.
@@ -44,8 +45,7 @@ import static com.att.eelf.configuration.Configuration.*;
  * the file and its delivery data as well as to attempt delivery.
  */
 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
-    private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
-    private static EELFLogger eelflogger = EELFManager.getInstance()
+    private static EELFLogger eelfLogger = EELFManager.getInstance()
             .getLogger(DeliveryTask.class);
     private DeliveryTaskHelper deliveryTaskHelper;
     private String pubid;
@@ -62,23 +62,26 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     private String feedid;
     private String subid;
     private int attempts;
+    private boolean followRedirects;
     private String[][] hdrs;
     private String newInvocationId;
+    private long resumeTime;
 
 
     /**
      * Create a delivery task for a given delivery queue and pub ID
      *
-     * @param    deliveryTaskHelper    The delivery task helper for the queue this task is in.
-     * @param    pubid    The publish ID for this file.  This is used as
-     * the base for the file name in the spool directory and is of
-     * the form <milliseconds since 1970>.<fqdn of initial data router node>
+     * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
+     * @param pubid              The publish ID for this file.  This is used as
+     *                           the base for the file name in the spool directory and is of
+     *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
      */
-    public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+    DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
         this.deliveryTaskHelper = deliveryTaskHelper;
         this.pubid = pubid;
         destInfo = deliveryTaskHelper.getDestinationInfo();
         subid = destInfo.getSubId();
+        this.followRedirects = destInfo.isFollowRedirects();
         feedid = destInfo.getLogData();
         spool = destInfo.getSpool();
         String dfn = spool + "/" + pubid;
@@ -87,6 +90,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
         metafile = new File(mfn);
         boolean monly = destInfo.isMetaDataOnly();
         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
+        resumeTime = System.currentTimeMillis();
         Vector<String[]> hdrv = new Vector<>();
 
         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
@@ -123,11 +127,12 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 hdrv.add(new String[]{h, v});
             }
         } catch (Exception e) {
-            loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+            eelfLogger.error("Exception", e);
         }
         hdrs = hdrv.toArray(new String[hdrv.size()][]);
         url = deliveryTaskHelper.getDestURL(fileid);
     }
+
     /**
      * Is the object a DeliveryTask with the same publication ID?
      */
@@ -158,10 +163,11 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     public String toString() {
         return (pubid);
     }
+
     /**
      * Get the publish ID
      */
-    public String getPublishId() {
+    String getPublishId() {
         return (pubid);
     }
 
@@ -178,6 +184,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
+            if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
+                    fileid = fileid.replace(".gz", "");
+            }
             url = deliveryTaskHelper.getDestURL(fileid);
             URL u = new URL(url);
             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
@@ -195,37 +204,16 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 if (expect100) {
                     uc.setRequestProperty("Expect", "100-continue");
                 }
-                uc.setFixedLengthStreamingMode(length);
                 uc.setDoOutput(true);
-                OutputStream os = null;
-                try {
-                    os = uc.getOutputStream();
-                } catch (ProtocolException pe) {
-                    deliveryTaskHelper.reportDeliveryExtra(this, -1L);
-                    // Rcvd error instead of 100-continue
-                    loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
-                }
-                if (os != null) {
-                    long sofar = 0;
-                    try (InputStream is = new FileInputStream(datafile)) {
-                        byte[] buf = new byte[1024 * 1024];
-                        while (sofar < length) {
-                            int i = buf.length;
-                            if (sofar + i > length) {
-                                i = (int) (length - sofar);
-                            }
-                            i = is.read(buf, 0, i);
-                            if (i <= 0) {
-                                throw new IOException("Unexpected problem reading data file " + datafile);
-                            }
-                            sofar += i;
-                            os.write(buf, 0, i);
-                        }
-                        os.close();
-                    } catch (IOException ioe) {
-                        deliveryTaskHelper.reportDeliveryExtra(this, sofar);
-                        throw ioe;
+                if (destInfo.isDecompress()) {
+                    if (isFiletypeGzip(datafile)) {
+                        sendDecompressedFile(uc);
+                    } else {
+                        uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
+                        sendFile(uc);
                     }
+                } else {
+                    sendFile(uc);
                 }
             }
             int rc = uc.getResponseCode();
@@ -259,26 +247,119 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             }
             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
         } catch (Exception e) {
-            loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+            eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e);
             deliveryTaskHelper.reportException(this, e);
         }
     }
 
+    /**
+     * To send decompressed gzip to the subscribers
+     *
+     * @param httpURLConnection connection used to make request
+     * @throws IOException
+     */
+    private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
+        byte[] buffer = new byte[8164];
+        httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+        OutputStream outputStream = getOutputStream(httpURLConnection);
+        if (outputStream != null) {
+            int bytesRead = 0;
+            try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
+                int bufferLength = buffer.length;
+                while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
+                    outputStream.write(buffer, 0, bytesRead);
+                }
+                outputStream.close();
+            } catch (IOException e) {
+                httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+                eelfLogger.info("Could not decompress file", e);
+                sendFile(httpURLConnection);
+            }
+
+        }
+    }
+
+    /**
+     * To send any file to the subscriber.
+     *
+     * @param httpURLConnection connection used to make request
+     * @throws IOException
+     */
+    private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
+        OutputStream os = getOutputStream(httpURLConnection);
+        if (os != null) {
+            long sofar = 0;
+            try (InputStream is = new FileInputStream(datafile)) {
+                byte[] buf = new byte[1024 * 1024];
+                while (sofar < length) {
+                    int i = buf.length;
+                    if (sofar + i > length) {
+                        i = (int) (length - sofar);
+                    }
+                    i = is.read(buf, 0, i);
+                    if (i <= 0) {
+                        throw new IOException("Unexpected problem reading data file " + datafile);
+                    }
+                    sofar += i;
+                    os.write(buf, 0, i);
+                }
+                os.close();
+            } catch (IOException ioe) {
+                deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+                throw ioe;
+            }
+        }
+    }
+
+    /**
+     * Get the outputstream that will be used to send data
+     *
+     * @param httpURLConnection connection used to make request
+     * @return AN Outpustream that can be used to send your data.
+     * @throws IOException
+     */
+    private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
+        OutputStream outputStream = null;
+
+        try {
+            outputStream = httpURLConnection.getOutputStream();
+        } catch (ProtocolException pe) {
+            deliveryTaskHelper.reportDeliveryExtra(this, -1L);
+            // Rcvd error instead of 100-continue
+            eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
+        }
+        return outputStream;
+    }
+
     /**
      * Remove meta and data files
      */
-    public void clean() {
+    void clean() {
         datafile.delete();
         metafile.delete();
-        eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
-        eelflogger.info(EelfMsgs.EXIT);
+        eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
+        eelfLogger.info(EelfMsgs.EXIT);
         hdrs = null;
     }
 
+    /**
+     * Set the resume time for a delivery task.
+     */
+    void setResumeTime(long resumeTime) {
+        this.resumeTime = resumeTime;
+    }
+
+    /**
+     * Get the resume time for a delivery task.
+     */
+    long getResumeTime() {
+        return resumeTime;
+    }
+
     /**
      * Has this delivery task been cleaned?
      */
-    public boolean isCleaned() {
+    boolean isCleaned() {
         return (hdrs == null);
     }
 
@@ -292,7 +373,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Get creation date as encoded in the publish ID.
      */
-    public long getDate() {
+    long getDate() {
         return (date);
     }
 
@@ -306,42 +387,49 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Get the content type
      */
-    public String getCType() {
+    String getCType() {
         return (ctype);
     }
 
     /**
      * Get the method
      */
-    public String getMethod() {
+    String getMethod() {
         return (method);
     }
 
     /**
      * Get the file ID
      */
-    public String getFileId() {
+    String getFileId() {
         return (fileid);
     }
 
     /**
      * Get the number of delivery attempts
      */
-    public int getAttempts() {
+    int getAttempts() {
         return (attempts);
     }
 
     /**
      * Get the (space delimited list of) subscription ID for this delivery task
      */
-    public String getSubId() {
+    String getSubId() {
         return (subid);
     }
 
     /**
      * Get the feed ID for this delivery task
      */
-    public String getFeedId() {
+    String getFeedId() {
         return (feedid);
     }
+
+    /**
+     * Get the followRedirects for this delivery task
+     */
+    public boolean getFollowRedirects() {
+        return(followRedirects);
+    }
 }