Adding decompression option for user to a feed.
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
index b2c3169..a3af88f 100644 (file)
@@ -27,6 +27,7 @@ package org.onap.dmaap.datarouter.node;
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.zip.GZIPInputStream;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
@@ -35,6 +36,7 @@ import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
 import org.slf4j.MDC;
 
 import static com.att.eelf.configuration.Configuration.*;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
 
 /**
  * A file to be delivered to a destination.
@@ -69,10 +71,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Create a delivery task for a given delivery queue and pub ID
      *
-     * @param    deliveryTaskHelper    The delivery task helper for the queue this task is in.
-     * @param    pubid    The publish ID for this file.  This is used as
-     * the base for the file name in the spool directory and is of
-     * the form <milliseconds since 1970>.<fqdn of initial data router node>
+     * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
+     * @param pubid              The publish ID for this file.  This is used as
+     *                           the base for the file name in the spool directory and is of
+     *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
      */
     public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
         this.deliveryTaskHelper = deliveryTaskHelper;
@@ -128,6 +130,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
         hdrs = hdrv.toArray(new String[hdrv.size()][]);
         url = deliveryTaskHelper.getDestURL(fileid);
     }
+
     /**
      * Is the object a DeliveryTask with the same publication ID?
      */
@@ -158,6 +161,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     public String toString() {
         return (pubid);
     }
+
     /**
      * Get the publish ID
      */
@@ -178,6 +182,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
+            if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
+                    fileid = fileid.replace(".gz", "");
+            }
             url = deliveryTaskHelper.getDestURL(fileid);
             URL u = new URL(url);
             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
@@ -195,37 +202,16 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 if (expect100) {
                     uc.setRequestProperty("Expect", "100-continue");
                 }
-                uc.setFixedLengthStreamingMode(length);
                 uc.setDoOutput(true);
-                OutputStream os = null;
-                try {
-                    os = uc.getOutputStream();
-                } catch (ProtocolException pe) {
-                    deliveryTaskHelper.reportDeliveryExtra(this, -1L);
-                    // Rcvd error instead of 100-continue
-                    loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
-                }
-                if (os != null) {
-                    long sofar = 0;
-                    try (InputStream is = new FileInputStream(datafile)) {
-                        byte[] buf = new byte[1024 * 1024];
-                        while (sofar < length) {
-                            int i = buf.length;
-                            if (sofar + i > length) {
-                                i = (int) (length - sofar);
-                            }
-                            i = is.read(buf, 0, i);
-                            if (i <= 0) {
-                                throw new IOException("Unexpected problem reading data file " + datafile);
-                            }
-                            sofar += i;
-                            os.write(buf, 0, i);
-                        }
-                        os.close();
-                    } catch (IOException ioe) {
-                        deliveryTaskHelper.reportDeliveryExtra(this, sofar);
-                        throw ioe;
+                if (destInfo.isDecompress()) {
+                    if (isFiletypeGzip(datafile)) {
+                        sendDecompressedFile(uc);
+                    } else {
+                        uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
+                        sendFile(uc);
                     }
+                } else {
+                    sendFile(uc);
                 }
             }
             int rc = uc.getResponseCode();
@@ -259,11 +245,90 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             }
             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
         } catch (Exception e) {
-            loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+            loggerDeliveryTask.error("Exception " + e.getStackTrace(), e);
             deliveryTaskHelper.reportException(this, e);
         }
     }
 
+    /**
+     * To send decompressed gzip to the subscribers
+     *
+     * @param httpURLConnection connection used to make request
+     * @throws IOException
+     */
+    private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
+        byte[] buffer = new byte[8164];
+        httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+        OutputStream outputStream = getOutputStream(httpURLConnection);
+        if (outputStream != null) {
+            int bytesRead = 0;
+            try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
+                int bufferLength = buffer.length;
+                while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
+                    outputStream.write(buffer, 0, bytesRead);
+                }
+                outputStream.close();
+            } catch (IOException e) {
+                httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+                loggerDeliveryTask.info("Could not decompress file");
+                sendFile(httpURLConnection);
+            }
+
+        }
+    }
+
+    /**
+     * To send any file to the subscriber.
+     *
+     * @param httpURLConnection connection used to make request
+     * @throws IOException
+     */
+    private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
+        OutputStream os = getOutputStream(httpURLConnection);
+        if (os != null) {
+            long sofar = 0;
+            try (InputStream is = new FileInputStream(datafile)) {
+                byte[] buf = new byte[1024 * 1024];
+                while (sofar < length) {
+                    int i = buf.length;
+                    if (sofar + i > length) {
+                        i = (int) (length - sofar);
+                    }
+                    i = is.read(buf, 0, i);
+                    if (i <= 0) {
+                        throw new IOException("Unexpected problem reading data file " + datafile);
+                    }
+                    sofar += i;
+                    os.write(buf, 0, i);
+                }
+                os.close();
+            } catch (IOException ioe) {
+                deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+                throw ioe;
+            }
+        }
+    }
+
+    /**
+     * Get the outputstream that will be used to send data
+     *
+     * @param httpURLConnection connection used to make request
+     * @return AN Outpustream that can be used to send your data.
+     * @throws IOException
+     */
+    private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
+        OutputStream outputStream = null;
+
+        try {
+            outputStream = httpURLConnection.getOutputStream();
+        } catch (ProtocolException pe) {
+            deliveryTaskHelper.reportDeliveryExtra(this, -1L);
+            // Rcvd error instead of 100-continue
+            loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe);
+        }
+        return outputStream;
+    }
+
     /**
      * Remove meta and data files
      */