X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdatarouter.git;a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FDeliveryTask.java;h=eb79b563d37dc75e640ac0f2e7a24c0607626e29;hp=c085ebe76b65c3e9a90231f52ddfb0e4c5ac6ff8;hb=6c78b3e6a0a67c73f931337356a172cc69cee0e8;hpb=a26d139f31caa06ec932498574b18c69780dab28 diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index c085ebe7..eb79b563 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -24,31 +24,40 @@ package org.onap.dmaap.datarouter.node; -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.zip.GZIPInputStream; +import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; +import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import org.apache.log4j.Logger; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.ProtocolException; +import java.net.URL; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.UUID; +import java.util.zip.GZIPInputStream; +import org.jetbrains.annotations.Nullable; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; -import static com.att.eelf.configuration.Configuration.*; -import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; - /** * A file to be delivered to a destination. - *

- * A Delivery task represents a work item for the data router - a file that - * needs to be delivered and provides mechanisms to get information about - * the file and its delivery data as well as to attempt delivery. + * + *

A Delivery task represents a work item for the data router - a file that needs to be delivered and provides + * mechanisms to get information about the file and its delivery data as well as to attempt delivery. */ public class DeliveryTask implements Runnable, Comparable { - private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); - private static EELFLogger eelflogger = EELFManager.getInstance() - .getLogger(DeliveryTask.class); + + private static final String DECOMPRESSION_STATUS = "Decompression_Status"; + private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class); private DeliveryTaskHelper deliveryTaskHelper; private String pubid; private DestInfo destInfo; @@ -64,90 +73,91 @@ public class DeliveryTask implements Runnable, Comparable { private String feedid; private String subid; private int attempts; + private boolean followRedirects; private String[][] hdrs; private String newInvocationId; private long resumeTime; /** - * Create a delivery task for a given delivery queue and pub ID + * Create a delivery task for a given delivery queue and pub ID. * * @param deliveryTaskHelper The delivery task helper for the queue this task is in. - * @param pubid The publish ID for this file. This is used as - * the base for the file name in the spool directory and is of - * the form . + * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and + * is of the form (milliseconds since 1970).(fqdn of initial data router node) */ DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { this.deliveryTaskHelper = deliveryTaskHelper; this.pubid = pubid; destInfo = deliveryTaskHelper.getDestinationInfo(); subid = destInfo.getSubId(); + this.followRedirects = destInfo.isFollowRedirects(); feedid = destInfo.getLogData(); spool = destInfo.getSpool(); - String dfn = spool + "/" + pubid; + String dfn = spool + File.separator + pubid; String mfn = dfn + ".M"; - datafile = new File(spool + "/" + pubid); + datafile = new File(spool + File.separator + pubid); metafile = new File(mfn); boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); resumeTime = System.currentTimeMillis(); - Vector hdrv = new Vector<>(); + ArrayList hdrv = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { - String s = br.readLine(); - int i = s.indexOf('\t'); - method = s.substring(0, i); + String line = br.readLine(); + int index = line.indexOf('\t'); + method = line.substring(0, index); NodeUtils.setIpAndFqdnForEelf(method); if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } - fileid = s.substring(i + 1); - while ((s = br.readLine()) != null) { - i = s.indexOf('\t'); - String h = s.substring(0, i); - String v = s.substring(i + 1); - if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) { - subid = v.replaceAll("[^ ]*/", ""); + fileid = line.substring(index + 1); + while ((line = br.readLine()) != null) { + index = line.indexOf('\t'); + String header = line.substring(0, index); + String headerValue = line.substring(index + 1); + if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) { + subid = headerValue.replaceAll("[^ ]*/", ""); feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", "")); } - if (length == 0 && h.toLowerCase().startsWith("content-")) { + if (length == 0 && header.toLowerCase().startsWith("content-")) { continue; } - if (h.equalsIgnoreCase("content-type")) { - ctype = v; + if ("content-type".equalsIgnoreCase(header)) { + ctype = headerValue; } - if (h.equalsIgnoreCase("x-onap-requestid")) { - MDC.put(MDC_KEY_REQUEST_ID, v); + if ("x-onap-requestid".equalsIgnoreCase(header)) { + MDC.put(MDC_KEY_REQUEST_ID, headerValue); } - if (h.equalsIgnoreCase("x-invocationid")) { - MDC.put("InvocationId", v); - v = UUID.randomUUID().toString(); - newInvocationId = v; + if ("x-invocationid".equalsIgnoreCase(header)) { + MDC.put("InvocationId", headerValue); + headerValue = UUID.randomUUID().toString(); + newInvocationId = headerValue; } - hdrv.add(new String[]{h, v}); + hdrv.add(new String[]{header, headerValue}); } } catch (Exception e) { - loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); + eelfLogger.error("Exception", e); } hdrs = hdrv.toArray(new String[hdrv.size()][]); url = deliveryTaskHelper.getDestURL(fileid); } /** - * Is the object a DeliveryTask with the same publication ID? + * Is the object a DeliveryTask with the same publication ID. */ - public boolean equals(Object o) { - if (!(o instanceof DeliveryTask)) { + public boolean equals(Object object) { + if (!(object instanceof DeliveryTask)) { return (false); } - return (pubid.equals(((DeliveryTask) o).pubid)); + return (pubid.equals(((DeliveryTask) object).pubid)); } /** * Compare the publication IDs. */ - public int compareTo(DeliveryTask o) { - return (pubid.compareTo(o.pubid)); + public int compareTo(DeliveryTask other) { + return (pubid.compareTo(other.pubid)); } /** @@ -165,105 +175,75 @@ public class DeliveryTask implements Runnable, Comparable { } /** - * Get the publish ID + * Get the publish ID. */ String getPublishId() { return (pubid); } /** - * Attempt delivery + * Attempt delivery. */ public void run() { attempts++; try { destInfo = deliveryTaskHelper.getDestinationInfo(); - boolean expect100 = destInfo.isUsing100(); boolean monly = destInfo.isMetaDataOnly(); length = 0; if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } - if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){ - fileid = fileid.replace(".gz", ""); - } + stripSuffixIfIsDecompress(); url = deliveryTaskHelper.getDestURL(fileid); - URL u = new URL(url); - HttpURLConnection uc = (HttpURLConnection) u.openConnection(); - uc.setConnectTimeout(60000); - uc.setReadTimeout(60000); - uc.setInstanceFollowRedirects(false); - uc.setRequestMethod(method); - uc.setRequestProperty("Content-Length", Long.toString(length)); - uc.setRequestProperty("Authorization", destInfo.getAuth()); - uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); - for (String[] nv : hdrs) { - uc.addRequestProperty(nv[0], nv[1]); - } - if (length > 0) { - if (expect100) { - uc.setRequestProperty("Expect", "100-continue"); - } - uc.setDoOutput(true); - if (destInfo.isDecompress()) { - if (isFiletypeGzip(datafile)) { - sendDecompressedFile(uc); - } else { - uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT"); - sendFile(uc); - } - } else { - sendFile(uc); - } - } - int rc = uc.getResponseCode(); - String rmsg = uc.getResponseMessage(); - if (rmsg == null) { - String h0 = uc.getHeaderField(0); - if (h0 != null) { - int i = h0.indexOf(' '); - int j = h0.indexOf(' ', i + 1); - if (i != -1 && j != -1) { - rmsg = h0.substring(j + 1); - } - } - } + URL urlObj = new URL(url); + HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection(); + urlConnection.setConnectTimeout(60000); + urlConnection.setReadTimeout(60000); + urlConnection.setInstanceFollowRedirects(false); + urlConnection.setRequestMethod(method); + urlConnection.setRequestProperty("Content-Length", Long.toString(length)); + urlConnection.setRequestProperty("Authorization", destInfo.getAuth()); + urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); + boolean expect100 = destInfo.isUsing100(); + int rc = deliverFileToSubscriber(expect100, urlConnection); + String rmsg = urlConnection.getResponseMessage(); + rmsg = getResponseMessage(urlConnection, rmsg); String xpubid = null; InputStream is; if (rc >= 200 && rc <= 299) { - is = uc.getInputStream(); - xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); + is = urlConnection.getInputStream(); + xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); } else { if (rc >= 300 && rc <= 399) { - rmsg = uc.getHeaderField("Location"); + rmsg = urlConnection.getHeaderField("Location"); } - is = uc.getErrorStream(); + is = urlConnection.getErrorStream(); } byte[] buf = new byte[4096]; if (is != null) { while (is.read(buf) > 0) { + //flush the buffer } is.close(); } deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { - loggerDeliveryTask.error("Exception " + e.getStackTrace(), e); + eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e); deliveryTaskHelper.reportException(this, e); } } /** - * To send decompressed gzip to the subscribers + * To send decompressed gzip to the subscribers. * * @param httpURLConnection connection used to make request - * @throws IOException */ private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException { byte[] buffer = new byte[8164]; - httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS"); + httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS"); OutputStream outputStream = getOutputStream(httpURLConnection); if (outputStream != null) { - int bytesRead = 0; + int bytesRead; try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) { int bufferLength = buffer.length; while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) { @@ -271,8 +251,8 @@ public class DeliveryTask implements Runnable, Comparable { } outputStream.close(); } catch (IOException e) { - httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE"); - loggerDeliveryTask.info("Could not decompress file"); + httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE"); + eelfLogger.info("Could not decompress file", e); sendFile(httpURLConnection); } @@ -283,70 +263,120 @@ public class DeliveryTask implements Runnable, Comparable { * To send any file to the subscriber. * * @param httpURLConnection connection used to make request - * @throws IOException */ private void sendFile(HttpURLConnection httpURLConnection) throws IOException { OutputStream os = getOutputStream(httpURLConnection); - if (os != null) { - long sofar = 0; - try (InputStream is = new FileInputStream(datafile)) { - byte[] buf = new byte[1024 * 1024]; - while (sofar < length) { - int i = buf.length; - if (sofar + i > length) { - i = (int) (length - sofar); - } - i = is.read(buf, 0, i); - if (i <= 0) { - throw new IOException("Unexpected problem reading data file " + datafile); - } - sofar += i; - os.write(buf, 0, i); + if (os == null) { + return; + } + long sofar = 0; + try (InputStream is = new FileInputStream(datafile)) { + byte[] buf = new byte[1024 * 1024]; + while (sofar < length) { + int len = buf.length; + if (sofar + len > length) { + len = (int) (length - sofar); + } + len = is.read(buf, 0, len); + if (len <= 0) { + throw new IOException("Unexpected problem reading data file " + datafile); } - os.close(); - } catch (IOException ioe) { - deliveryTaskHelper.reportDeliveryExtra(this, sofar); - throw ioe; + sofar += len; + os.write(buf, 0, len); } + os.close(); + } catch (IOException ioe) { + deliveryTaskHelper.reportDeliveryExtra(this, sofar); + throw ioe; } } /** - * Get the outputstream that will be used to send data + * Get the outputstream that will be used to send data. * * @param httpURLConnection connection used to make request * @return AN Outpustream that can be used to send your data. - * @throws IOException */ - private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException { + OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException { OutputStream outputStream = null; - try { outputStream = httpURLConnection.getOutputStream(); } catch (ProtocolException pe) { deliveryTaskHelper.reportDeliveryExtra(this, -1L); // Rcvd error instead of 100-continue - loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe); + eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe); } return outputStream; } + private void stripSuffixIfIsDecompress() { + if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) { + fileid = fileid.replace(".gz", ""); + } + } + + private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException { + for (String[] nv : hdrs) { + uc.addRequestProperty(nv[0], nv[1]); + } + if (length > 0) { + if (expect100) { + uc.setRequestProperty("Expect", "100-continue"); + } + uc.setDoOutput(true); + if (destInfo.isDecompress()) { + if (isFiletypeGzip(datafile)) { + sendDecompressedFile(uc); + } else { + uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT"); + sendFile(uc); + } + } else { + sendFile(uc); + } + } + return uc.getResponseCode(); + } + + @Nullable + private String getResponseMessage(HttpURLConnection uc, String rmsg) { + if (rmsg == null) { + String h0 = uc.getHeaderField(0); + if (h0 != null) { + int indexOfSpace1 = h0.indexOf(' '); + int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1); + if (indexOfSpace1 != -1 && indexOfSpace2 != -1) { + rmsg = h0.substring(indexOfSpace2 + 1); + } + } + } + return rmsg; + } + /** - * Remove meta and data files + * Remove meta and data files. */ void clean() { - datafile.delete(); - metafile.delete(); - eelflogger.info(EelfMsgs.INVOKE, newInvocationId); - eelflogger.info(EelfMsgs.EXIT); + deleteWithRetry(datafile); + deleteWithRetry(metafile); + eelfLogger.info(EelfMsgs.INVOKE, newInvocationId); + eelfLogger.info(EelfMsgs.EXIT); hdrs = null; } - /** - * Set the resume time for a delivery task. - */ - void setResumeTime(long resumeTime) { - this.resumeTime = resumeTime; + private void deleteWithRetry(File file) { + int maxTries = 3; + int tryCount = 1; + while (tryCount <= maxTries) { + try { + Files.deleteIfExists(file.toPath()); + break; + } catch (IOException e) { + eelfLogger.error("IOException : Failed to delete file :" + + file.getName() + " on attempt " + tryCount, e); + } + tryCount++; + } } /** @@ -357,14 +387,21 @@ public class DeliveryTask implements Runnable, Comparable { } /** - * Has this delivery task been cleaned? + * Set the resume time for a delivery task. + */ + void setResumeTime(long resumeTime) { + this.resumeTime = resumeTime; + } + + /** + * Has this delivery task been cleaned. */ boolean isCleaned() { return (hdrs == null); } /** - * Get length of body + * Get length of body. */ public long getLength() { return (length); @@ -378,51 +415,58 @@ public class DeliveryTask implements Runnable, Comparable { } /** - * Get the most recent delivery attempt URL + * Get the most recent delivery attempt URL. */ public String getURL() { return (url); } /** - * Get the content type + * Get the content type. */ String getCType() { return (ctype); } /** - * Get the method + * Get the method. */ String getMethod() { return (method); } /** - * Get the file ID + * Get the file ID. */ String getFileId() { return (fileid); } /** - * Get the number of delivery attempts + * Get the number of delivery attempts. */ int getAttempts() { return (attempts); } /** - * Get the (space delimited list of) subscription ID for this delivery task + * Get the (space delimited list of) subscription ID for this delivery task. */ String getSubId() { return (subid); } /** - * Get the feed ID for this delivery task + * Get the feed ID for this delivery task. */ String getFeedId() { return (feedid); } + + /** + * Get the followRedirects for this delivery task. + */ + boolean getFollowRedirects() { + return (followRedirects); + } }