Merge "Update INFO.yaml file"
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
index 7e10d5a..b2c3169 100644 (file)
@@ -28,7 +28,13 @@ import java.io.*;
 import java.net.*;
 import java.util.*;
 
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
 import org.apache.log4j.Logger;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
+import org.slf4j.MDC;
+
+import static com.att.eelf.configuration.Configuration.*;
 
 /**
  * A file to be delivered to a destination.
@@ -39,9 +45,11 @@ import org.apache.log4j.Logger;
  */
 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
-    private DeliveryTaskHelper dth;
+    private static EELFLogger eelflogger = EELFManager.getInstance()
+            .getLogger(DeliveryTask.class);
+    private DeliveryTaskHelper deliveryTaskHelper;
     private String pubid;
-    private DestInfo di;
+    private DestInfo destInfo;
     private String spool;
     private File datafile;
     private File metafile;
@@ -55,35 +63,37 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     private String subid;
     private int attempts;
     private String[][] hdrs;
+    private String newInvocationId;
 
 
     /**
      * Create a delivery task for a given delivery queue and pub ID
      *
-     * @param    dth    The delivery task helper for the queue this task is in.
+     * @param    deliveryTaskHelper    The delivery task helper for the queue this task is in.
      * @param    pubid    The publish ID for this file.  This is used as
      * the base for the file name in the spool directory and is of
      * the form <milliseconds since 1970>.<fqdn of initial data router node>
      */
-    public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
-        this.dth = dth;
+    public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+        this.deliveryTaskHelper = deliveryTaskHelper;
         this.pubid = pubid;
-        di = dth.getDestInfo();
-        subid = di.getSubId();
-        feedid = di.getLogData();
-        spool = di.getSpool();
+        destInfo = deliveryTaskHelper.getDestinationInfo();
+        subid = destInfo.getSubId();
+        feedid = destInfo.getLogData();
+        spool = destInfo.getSpool();
         String dfn = spool + "/" + pubid;
         String mfn = dfn + ".M";
         datafile = new File(spool + "/" + pubid);
         metafile = new File(mfn);
-        boolean monly = di.isMetaDataOnly();
+        boolean monly = destInfo.isMetaDataOnly();
         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
-        Vector<String[]> hdrv = new Vector<String[]>();
+        Vector<String[]> hdrv = new Vector<>();
 
         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
             String s = br.readLine();
             int i = s.indexOf('\t');
             method = s.substring(0, i);
+            NodeUtils.setIpAndFqdnForEelf(method);
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
@@ -92,9 +102,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 i = s.indexOf('\t');
                 String h = s.substring(0, i);
                 String v = s.substring(i + 1);
-                if ("x-att-dr-routing".equalsIgnoreCase(h)) {
+                if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
                     subid = v.replaceAll("[^ ]*/", "");
-                    feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+                    feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
                 }
                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
                     continue;
@@ -102,13 +112,21 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 if (h.equalsIgnoreCase("content-type")) {
                     ctype = v;
                 }
+                if (h.equalsIgnoreCase("x-onap-requestid")) {
+                    MDC.put(MDC_KEY_REQUEST_ID, v);
+                }
+                if (h.equalsIgnoreCase("x-invocationid")) {
+                    MDC.put("InvocationId", v);
+                    v = UUID.randomUUID().toString();
+                    newInvocationId = v;
+                }
                 hdrv.add(new String[]{h, v});
             }
         } catch (Exception e) {
             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
         }
         hdrs = hdrv.toArray(new String[hdrv.size()][]);
-        url = dth.getDestURL(fileid);
+        url = deliveryTaskHelper.getDestURL(fileid);
     }
     /**
      * Is the object a DeliveryTask with the same publication ID?
@@ -153,14 +171,14 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     public void run() {
         attempts++;
         try {
-            di = dth.getDestInfo();
-            boolean expect100 = di.isUsing100();
-            boolean monly = di.isMetaDataOnly();
+            destInfo = deliveryTaskHelper.getDestinationInfo();
+            boolean expect100 = destInfo.isUsing100();
+            boolean monly = destInfo.isMetaDataOnly();
             length = 0;
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
-            url = dth.getDestURL(fileid);
+            url = deliveryTaskHelper.getDestURL(fileid);
             URL u = new URL(url);
             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
             uc.setConnectTimeout(60000);
@@ -168,8 +186,8 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             uc.setInstanceFollowRedirects(false);
             uc.setRequestMethod(method);
             uc.setRequestProperty("Content-Length", Long.toString(length));
-            uc.setRequestProperty("Authorization", di.getAuth());
-            uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
+            uc.setRequestProperty("Authorization", destInfo.getAuth());
+            uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
             for (String[] nv : hdrs) {
                 uc.addRequestProperty(nv[0], nv[1]);
             }
@@ -183,7 +201,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 try {
                     os = uc.getOutputStream();
                 } catch (ProtocolException pe) {
-                    dth.reportDeliveryExtra(this, -1L);
+                    deliveryTaskHelper.reportDeliveryExtra(this, -1L);
                     // Rcvd error instead of 100-continue
                     loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
                 }
@@ -205,7 +223,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                         }
                         os.close();
                     } catch (IOException ioe) {
-                        dth.reportDeliveryExtra(this, sofar);
+                        deliveryTaskHelper.reportDeliveryExtra(this, sofar);
                         throw ioe;
                     }
                 }
@@ -226,7 +244,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             InputStream is;
             if (rc >= 200 && rc <= 299) {
                 is = uc.getInputStream();
-                xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
+                xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
             } else {
                 if (rc >= 300 && rc <= 399) {
                     rmsg = uc.getHeaderField("Location");
@@ -239,10 +257,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 }
                 is.close();
             }
-            dth.reportStatus(this, rc, xpubid, rmsg);
+            deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
         } catch (Exception e) {
             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
-            dth.reportException(this, e);
+            deliveryTaskHelper.reportException(this, e);
         }
     }
 
@@ -252,6 +270,8 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     public void clean() {
         datafile.delete();
         metafile.delete();
+        eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
+        eelflogger.info(EelfMsgs.EXIT);
         hdrs = null;
     }