X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdatarouter.git;a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FDeliveryTask.java;h=018c3aff778754bff00fe525670cf3d9045ad85f;hp=4d570b555a60b3bb4fc88b902ca1ce6ea78ea445;hb=5e6a9f65049e8e8d39e8dcab227e5d75b328b173;hpb=14b8d9552808063686e0c22760cec6e35b960b59 diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index 4d570b55..018c3aff 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -27,14 +27,15 @@ package org.onap.dmaap.datarouter.node; import java.io.*; import java.net.*; import java.util.*; +import java.util.zip.GZIPInputStream; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import org.apache.log4j.Logger; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; import static com.att.eelf.configuration.Configuration.*; +import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; /** * A file to be delivered to a destination. @@ -44,12 +45,11 @@ import static com.att.eelf.configuration.Configuration.*; * the file and its delivery data as well as to attempt delivery. */ public class DeliveryTask implements Runnable, Comparable { - private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); - private static EELFLogger eelflogger = EELFManager.getInstance() + private static EELFLogger eelfLogger = EELFManager.getInstance() .getLogger(DeliveryTask.class); - private DeliveryTaskHelper dth; + private DeliveryTaskHelper deliveryTaskHelper; private String pubid; - private DestInfo di; + private DestInfo destInfo; private String spool; private File datafile; private File metafile; @@ -62,32 +62,36 @@ public class DeliveryTask implements Runnable, Comparable { private String feedid; private String subid; private int attempts; + private boolean followRedirects; private String[][] hdrs; private String newInvocationId; + private long resumeTime; /** * Create a delivery task for a given delivery queue and pub ID * - * @param dth The delivery task helper for the queue this task is in. - * @param pubid The publish ID for this file. This is used as - * the base for the file name in the spool directory and is of - * the form . + * @param deliveryTaskHelper The delivery task helper for the queue this task is in. + * @param pubid The publish ID for this file. This is used as + * the base for the file name in the spool directory and is of + * the form . */ - public DeliveryTask(DeliveryTaskHelper dth, String pubid) { - this.dth = dth; + DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { + this.deliveryTaskHelper = deliveryTaskHelper; this.pubid = pubid; - di = dth.getDestInfo(); - subid = di.getSubId(); - feedid = di.getLogData(); - spool = di.getSpool(); + destInfo = deliveryTaskHelper.getDestinationInfo(); + subid = destInfo.getSubId(); + this.followRedirects = destInfo.isFollowRedirects(); + feedid = destInfo.getLogData(); + spool = destInfo.getSpool(); String dfn = spool + "/" + pubid; String mfn = dfn + ".M"; datafile = new File(spool + "/" + pubid); metafile = new File(mfn); - boolean monly = di.isMetaDataOnly(); + boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); - Vector hdrv = new Vector(); + resumeTime = System.currentTimeMillis(); + Vector hdrv = new Vector<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { String s = br.readLine(); @@ -102,9 +106,9 @@ public class DeliveryTask implements Runnable, Comparable { i = s.indexOf('\t'); String h = s.substring(0, i); String v = s.substring(i + 1); - if ("x-att-dr-routing".equalsIgnoreCase(h)) { + if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) { subid = v.replaceAll("[^ ]*/", ""); - feedid = dth.getFeedId(subid.replaceAll(" .*", "")); + feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", "")); } if (length == 0 && h.toLowerCase().startsWith("content-")) { continue; @@ -123,11 +127,12 @@ public class DeliveryTask implements Runnable, Comparable { hdrv.add(new String[]{h, v}); } } catch (Exception e) { - loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); + eelfLogger.error("Exception", e); } hdrs = hdrv.toArray(new String[hdrv.size()][]); - url = dth.getDestURL(fileid); + url = deliveryTaskHelper.getDestURL(fileid); } + /** * Is the object a DeliveryTask with the same publication ID? */ @@ -158,10 +163,11 @@ public class DeliveryTask implements Runnable, Comparable { public String toString() { return (pubid); } + /** * Get the publish ID */ - public String getPublishId() { + String getPublishId() { return (pubid); } @@ -171,14 +177,17 @@ public class DeliveryTask implements Runnable, Comparable { public void run() { attempts++; try { - di = dth.getDestInfo(); - boolean expect100 = di.isUsing100(); - boolean monly = di.isMetaDataOnly(); + destInfo = deliveryTaskHelper.getDestinationInfo(); + boolean expect100 = destInfo.isUsing100(); + boolean monly = destInfo.isMetaDataOnly(); length = 0; if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } - url = dth.getDestURL(fileid); + if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){ + fileid = fileid.replace(".gz", ""); + } + url = deliveryTaskHelper.getDestURL(fileid); URL u = new URL(url); HttpURLConnection uc = (HttpURLConnection) u.openConnection(); uc.setConnectTimeout(60000); @@ -186,8 +195,8 @@ public class DeliveryTask implements Runnable, Comparable { uc.setInstanceFollowRedirects(false); uc.setRequestMethod(method); uc.setRequestProperty("Content-Length", Long.toString(length)); - uc.setRequestProperty("Authorization", di.getAuth()); - uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid); + uc.setRequestProperty("Authorization", destInfo.getAuth()); + uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); for (String[] nv : hdrs) { uc.addRequestProperty(nv[0], nv[1]); } @@ -195,37 +204,16 @@ public class DeliveryTask implements Runnable, Comparable { if (expect100) { uc.setRequestProperty("Expect", "100-continue"); } - uc.setFixedLengthStreamingMode(length); uc.setDoOutput(true); - OutputStream os = null; - try { - os = uc.getOutputStream(); - } catch (ProtocolException pe) { - dth.reportDeliveryExtra(this, -1L); - // Rcvd error instead of 100-continue - loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe); - } - if (os != null) { - long sofar = 0; - try (InputStream is = new FileInputStream(datafile)) { - byte[] buf = new byte[1024 * 1024]; - while (sofar < length) { - int i = buf.length; - if (sofar + i > length) { - i = (int) (length - sofar); - } - i = is.read(buf, 0, i); - if (i <= 0) { - throw new IOException("Unexpected problem reading data file " + datafile); - } - sofar += i; - os.write(buf, 0, i); - } - os.close(); - } catch (IOException ioe) { - dth.reportDeliveryExtra(this, sofar); - throw ioe; + if (destInfo.isDecompress()) { + if (isFiletypeGzip(datafile)) { + sendDecompressedFile(uc); + } else { + uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT"); + sendFile(uc); } + } else { + sendFile(uc); } } int rc = uc.getResponseCode(); @@ -244,7 +232,7 @@ public class DeliveryTask implements Runnable, Comparable { InputStream is; if (rc >= 200 && rc <= 299) { is = uc.getInputStream(); - xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID"); + xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); } else { if (rc >= 300 && rc <= 399) { rmsg = uc.getHeaderField("Location"); @@ -257,28 +245,121 @@ public class DeliveryTask implements Runnable, Comparable { } is.close(); } - dth.reportStatus(this, rc, xpubid, rmsg); + deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { - loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); - dth.reportException(this, e); + eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e); + deliveryTaskHelper.reportException(this, e); + } + } + + /** + * To send decompressed gzip to the subscribers + * + * @param httpURLConnection connection used to make request + * @throws IOException + */ + private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException { + byte[] buffer = new byte[8164]; + httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS"); + OutputStream outputStream = getOutputStream(httpURLConnection); + if (outputStream != null) { + int bytesRead = 0; + try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) { + int bufferLength = buffer.length; + while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) { + outputStream.write(buffer, 0, bytesRead); + } + outputStream.close(); + } catch (IOException e) { + httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE"); + eelfLogger.info("Could not decompress file", e); + sendFile(httpURLConnection); + } + } } + /** + * To send any file to the subscriber. + * + * @param httpURLConnection connection used to make request + * @throws IOException + */ + private void sendFile(HttpURLConnection httpURLConnection) throws IOException { + OutputStream os = getOutputStream(httpURLConnection); + if (os != null) { + long sofar = 0; + try (InputStream is = new FileInputStream(datafile)) { + byte[] buf = new byte[1024 * 1024]; + while (sofar < length) { + int i = buf.length; + if (sofar + i > length) { + i = (int) (length - sofar); + } + i = is.read(buf, 0, i); + if (i <= 0) { + throw new IOException("Unexpected problem reading data file " + datafile); + } + sofar += i; + os.write(buf, 0, i); + } + os.close(); + } catch (IOException ioe) { + deliveryTaskHelper.reportDeliveryExtra(this, sofar); + throw ioe; + } + } + } + + /** + * Get the outputstream that will be used to send data + * + * @param httpURLConnection connection used to make request + * @return AN Outpustream that can be used to send your data. + * @throws IOException + */ + private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException { + OutputStream outputStream = null; + + try { + outputStream = httpURLConnection.getOutputStream(); + } catch (ProtocolException pe) { + deliveryTaskHelper.reportDeliveryExtra(this, -1L); + // Rcvd error instead of 100-continue + eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe); + } + return outputStream; + } + /** * Remove meta and data files */ - public void clean() { + void clean() { datafile.delete(); metafile.delete(); - eelflogger.info(EelfMsgs.INVOKE, newInvocationId); - eelflogger.info(EelfMsgs.EXIT); + eelfLogger.info(EelfMsgs.INVOKE, newInvocationId); + eelfLogger.info(EelfMsgs.EXIT); hdrs = null; } + /** + * Set the resume time for a delivery task. + */ + void setResumeTime(long resumeTime) { + this.resumeTime = resumeTime; + } + + /** + * Get the resume time for a delivery task. + */ + long getResumeTime() { + return resumeTime; + } + /** * Has this delivery task been cleaned? */ - public boolean isCleaned() { + boolean isCleaned() { return (hdrs == null); } @@ -292,7 +373,7 @@ public class DeliveryTask implements Runnable, Comparable { /** * Get creation date as encoded in the publish ID. */ - public long getDate() { + long getDate() { return (date); } @@ -306,42 +387,49 @@ public class DeliveryTask implements Runnable, Comparable { /** * Get the content type */ - public String getCType() { + String getCType() { return (ctype); } /** * Get the method */ - public String getMethod() { + String getMethod() { return (method); } /** * Get the file ID */ - public String getFileId() { + String getFileId() { return (fileid); } /** * Get the number of delivery attempts */ - public int getAttempts() { + int getAttempts() { return (attempts); } /** * Get the (space delimited list of) subscription ID for this delivery task */ - public String getSubId() { + String getSubId() { return (subid); } /** * Get the feed ID for this delivery task */ - public String getFeedId() { + String getFeedId() { return (feedid); } + + /** + * Get the followRedirects for this delivery task + */ + public boolean getFollowRedirects() { + return(followRedirects); + } }