X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FDeliveryTask.java;h=80729905a3e9f3ee1e926d8ac96c68de06486bce;hb=ee6fa61e2cd7df99891092709765235b6166a041;hp=76bf04e2ac97e2bdf57d33c92892ff00f107fc12;hpb=1841cb5d8da7b21996f8faad9d24f858e6ce8a41;p=dmaap%2Fdatarouter.git diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index 76bf04e2..80729905 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -28,7 +28,13 @@ import java.io.*; import java.net.*; import java.util.*; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.apache.log4j.Logger; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; +import org.slf4j.MDC; + +import static com.att.eelf.configuration.Configuration.*; /** * A file to be delivered to a destination. @@ -38,7 +44,9 @@ import org.apache.log4j.Logger; * the file and its delivery data as well as to attempt delivery. */ public class DeliveryTask implements Runnable, Comparable { - private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); + private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); + private static EELFLogger eelflogger = EELFManager.getInstance() + .getLogger(DeliveryTask.class); private DeliveryTaskHelper dth; private String pubid; private DestInfo di; @@ -55,37 +63,8 @@ public class DeliveryTask implements Runnable, Comparable { private String subid; private int attempts; private String[][] hdrs; + private String newInvocationId; - /** - * Is the object a DeliveryTask with the same publication ID? - */ - public boolean equals(Object o) { - if (!(o instanceof DeliveryTask)) { - return (false); - } - return (pubid.equals(((DeliveryTask) o).pubid)); - } - - /** - * Compare the publication IDs. - */ - public int compareTo(DeliveryTask o) { - return (pubid.compareTo(o.pubid)); - } - - /** - * Get the hash code of the publication ID. - */ - public int hashCode() { - return (pubid.hashCode()); - } - - /** - * Return the publication ID. - */ - public String toString() { - return (pubid); - } /** * Create a delivery task for a given delivery queue and pub ID @@ -109,11 +88,12 @@ public class DeliveryTask implements Runnable, Comparable { boolean monly = di.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); Vector hdrv = new Vector(); - try { - BufferedReader br = new BufferedReader(new FileReader(metafile)); + + try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { String s = br.readLine(); int i = s.indexOf('\t'); method = s.substring(0, i); + NodeUtils.setIpAndFqdnForEelf(method); if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } @@ -122,7 +102,7 @@ public class DeliveryTask implements Runnable, Comparable { i = s.indexOf('\t'); String h = s.substring(0, i); String v = s.substring(i + 1); - if ("x-att-dr-routing".equalsIgnoreCase(h)) { + if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) { subid = v.replaceAll("[^ ]*/", ""); feedid = dth.getFeedId(subid.replaceAll(" .*", "")); } @@ -132,15 +112,52 @@ public class DeliveryTask implements Runnable, Comparable { if (h.equalsIgnoreCase("content-type")) { ctype = v; } + if (h.equalsIgnoreCase("x-onap-requestid")) { + MDC.put(MDC_KEY_REQUEST_ID, v); + } + if (h.equalsIgnoreCase("x-invocationid")) { + MDC.put("InvocationId", v); + v = UUID.randomUUID().toString(); + newInvocationId = v; + } hdrv.add(new String[]{h, v}); } - br.close(); } catch (Exception e) { + loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); } hdrs = hdrv.toArray(new String[hdrv.size()][]); url = dth.getDestURL(fileid); } + /** + * Is the object a DeliveryTask with the same publication ID? + */ + public boolean equals(Object o) { + if (!(o instanceof DeliveryTask)) { + return (false); + } + return (pubid.equals(((DeliveryTask) o).pubid)); + } + + /** + * Compare the publication IDs. + */ + public int compareTo(DeliveryTask o) { + return (pubid.compareTo(o.pubid)); + } + + /** + * Get the hash code of the publication ID. + */ + public int hashCode() { + return (pubid.hashCode()); + } + /** + * Return the publication ID. + */ + public String toString() { + return (pubid); + } /** * Get the publish ID */ @@ -170,7 +187,7 @@ public class DeliveryTask implements Runnable, Comparable { uc.setRequestMethod(method); uc.setRequestProperty("Content-Length", Long.toString(length)); uc.setRequestProperty("Authorization", di.getAuth()); - uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid); + uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); for (String[] nv : hdrs) { uc.addRequestProperty(nv[0], nv[1]); } @@ -186,12 +203,12 @@ public class DeliveryTask implements Runnable, Comparable { } catch (ProtocolException pe) { dth.reportDeliveryExtra(this, -1L); // Rcvd error instead of 100-continue + loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe); } if (os != null) { long sofar = 0; - try { + try (InputStream is = new FileInputStream(datafile)) { byte[] buf = new byte[1024 * 1024]; - InputStream is = new FileInputStream(datafile); while (sofar < length) { int i = buf.length; if (sofar + i > length) { @@ -204,7 +221,6 @@ public class DeliveryTask implements Runnable, Comparable { sofar += i; os.write(buf, 0, i); } - is.close(); os.close(); } catch (IOException ioe) { dth.reportDeliveryExtra(this, sofar); @@ -228,7 +244,7 @@ public class DeliveryTask implements Runnable, Comparable { InputStream is; if (rc >= 200 && rc <= 299) { is = uc.getInputStream(); - xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID"); + xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); } else { if (rc >= 300 && rc <= 399) { rmsg = uc.getHeaderField("Location"); @@ -243,6 +259,7 @@ public class DeliveryTask implements Runnable, Comparable { } dth.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { + loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); dth.reportException(this, e); } } @@ -253,6 +270,8 @@ public class DeliveryTask implements Runnable, Comparable { public void clean() { datafile.delete(); metafile.delete(); + eelflogger.info(EelfMsgs.INVOKE, newInvocationId); + eelflogger.info(EelfMsgs.EXIT); hdrs = null; }