Update project structure to org.onap
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
new file mode 100644 (file)
index 0000000..c07822d
--- /dev/null
@@ -0,0 +1,308 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package org.onap.dmaap.datarouter.node;\r
+\r
+import java.io.*;\r
+import java.net.*;\r
+import java.util.*;\r
+import org.apache.log4j.Logger;\r
+\r
+/**\r
+ *     A file to be delivered to a destination.\r
+ *     <p>\r
+ *     A Delivery task represents a work item for the data router - a file that\r
+ *     needs to be delivered and provides mechanisms to get information about\r
+ *     the file and its delivery data as well as to attempt delivery.\r
+ */\r
+public class DeliveryTask implements Runnable, Comparable<DeliveryTask>        {\r
+       private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");\r
+       private DeliveryTaskHelper      dth;\r
+       private String  pubid;\r
+       private DestInfo        di;\r
+       private String  spool;\r
+       private File    datafile;\r
+       private File    metafile;\r
+       private long    length;\r
+       private long    date;\r
+       private String  method;\r
+       private String  fileid;\r
+       private String  ctype;\r
+       private String  url;\r
+       private String  feedid;\r
+       private String  subid;\r
+       private int     attempts;\r
+       private String[][]      hdrs;\r
+       /**\r
+        *      Is the object a DeliveryTask with the same publication ID?\r
+        */\r
+       public boolean equals(Object o) {\r
+               if (!(o instanceof DeliveryTask)) {\r
+                       return(false);\r
+               }\r
+               return(pubid.equals(((DeliveryTask)o).pubid));\r
+       }\r
+       /**\r
+        *      Compare the publication IDs.\r
+        */\r
+       public int compareTo(DeliveryTask o) {\r
+               return(pubid.compareTo(o.pubid));\r
+       }\r
+       /**\r
+        *      Get the hash code of the publication ID.\r
+        */\r
+       public int hashCode() {\r
+               return(pubid.hashCode());\r
+       }\r
+       /**\r
+        *      Return the publication ID.\r
+        */\r
+       public String toString() {\r
+               return(pubid);\r
+       }\r
+       /**\r
+        *      Create a delivery task for a given delivery queue and pub ID\r
+        *      @param  dth     The delivery task helper for the queue this task is in.\r
+        *      @param  pubid   The publish ID for this file.  This is used as\r
+        *      the base for the file name in the spool directory and is of\r
+        *      the form <milliseconds since 1970>.<fqdn of initial data router node>\r
+        */\r
+       public DeliveryTask(DeliveryTaskHelper dth, String pubid) {\r
+               this.dth = dth;\r
+               this.pubid = pubid;\r
+               di = dth.getDestInfo();\r
+               subid = di.getSubId();\r
+               feedid = di.getLogData();\r
+               spool = di.getSpool();\r
+               String dfn = spool + "/" + pubid;\r
+               String mfn = dfn + ".M";\r
+               datafile = new File(spool + "/" + pubid);\r
+               metafile = new File(mfn);\r
+               boolean monly = di.isMetaDataOnly();\r
+               date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));\r
+               Vector<String[]> hdrv = new Vector<String[]>();\r
+               try {\r
+                       BufferedReader br = new BufferedReader(new FileReader(metafile));\r
+                       String s = br.readLine();\r
+                       int i = s.indexOf('\t');\r
+                       method = s.substring(0, i);\r
+                       if (!"DELETE".equals(method) && !monly) {\r
+                               length = datafile.length();\r
+                       }\r
+                       fileid = s.substring(i + 1);\r
+                       while ((s = br.readLine()) != null) {\r
+                               i = s.indexOf('\t');\r
+                               String h = s.substring(0, i);\r
+                               String v = s.substring(i + 1);\r
+                               if ("x-att-dr-routing".equalsIgnoreCase(h)) {\r
+                                       subid = v.replaceAll("[^ ]*/", "");\r
+                                       feedid = dth.getFeedId(subid.replaceAll(" .*", ""));\r
+                               }\r
+                               if (length == 0 && h.toLowerCase().startsWith("content-")) {\r
+                                       continue;\r
+                               }\r
+                               if (h.equalsIgnoreCase("content-type")) {\r
+                                       ctype = v;\r
+                               }\r
+                               hdrv.add(new String[] {h, v});\r
+                       }\r
+                       br.close();\r
+               } catch (Exception e) {\r
+               }\r
+               hdrs = hdrv.toArray(new String[hdrv.size()][]);\r
+               url = dth.getDestURL(fileid);\r
+       }\r
+       /**\r
+        *      Get the publish ID\r
+        */\r
+       public String getPublishId() {\r
+               return(pubid);\r
+       }\r
+       /**\r
+        *      Attempt delivery\r
+        */\r
+       public void run() {\r
+               attempts++;\r
+               try {\r
+                       di = dth.getDestInfo();\r
+                       boolean expect100 = di.isUsing100();\r
+                       boolean monly = di.isMetaDataOnly();\r
+                       length = 0;\r
+                       if (!"DELETE".equals(method) && !monly) {\r
+                               length = datafile.length();\r
+                       }\r
+                       url = dth.getDestURL(fileid);\r
+                       URL u = new URL(url);\r
+                       HttpURLConnection uc = (HttpURLConnection)u.openConnection();\r
+                       uc.setConnectTimeout(60000);\r
+                       uc.setReadTimeout(60000);\r
+                       uc.setInstanceFollowRedirects(false);\r
+                       uc.setRequestMethod(method);\r
+                       uc.setRequestProperty("Content-Length", Long.toString(length));\r
+                       uc.setRequestProperty("Authorization", di.getAuth());\r
+                       uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);\r
+                       for (String[] nv: hdrs) {\r
+                               uc.addRequestProperty(nv[0], nv[1]);\r
+                       }\r
+                       if (length > 0) {\r
+                               if (expect100) {\r
+                                       uc.setRequestProperty("Expect", "100-continue");\r
+                               }\r
+                               uc.setFixedLengthStreamingMode(length);\r
+                               uc.setDoOutput(true);\r
+                               OutputStream os = null;\r
+                               try {\r
+                                       os = uc.getOutputStream();\r
+                               } catch (ProtocolException pe) {\r
+                                       dth.reportDeliveryExtra(this, -1L);\r
+                                       // Rcvd error instead of 100-continue\r
+                               }\r
+                               if (os != null) {\r
+                                       long sofar = 0;\r
+                                       try {\r
+                                               byte[] buf = new byte[1024 * 1024];\r
+                                               InputStream is = new FileInputStream(datafile);\r
+                                               while (sofar < length) {\r
+                                                       int i = buf.length;\r
+                                                       if (sofar + i > length) {\r
+                                                               i = (int)(length - sofar);\r
+                                                       }\r
+                                                       i = is.read(buf, 0, i);\r
+                                                       if (i <= 0) {\r
+                                                               throw new IOException("Unexpected problem reading data file " + datafile);\r
+                                                       }\r
+                                                       sofar += i;\r
+                                                       os.write(buf, 0, i);\r
+                                               }\r
+                                               is.close();\r
+                                               os.close();\r
+                                       } catch (IOException ioe) {\r
+                                               dth.reportDeliveryExtra(this, sofar);\r
+                                               throw ioe;\r
+                                       }\r
+                               }\r
+                       }\r
+                       int rc = uc.getResponseCode();\r
+                       String rmsg = uc.getResponseMessage();\r
+                       if (rmsg == null) {\r
+                               String h0 = uc.getHeaderField(0);\r
+                               if (h0 != null) {\r
+                                       int i = h0.indexOf(' ');\r
+                                       int j = h0.indexOf(' ', i + 1);\r
+                                       if (i != -1 && j != -1) {\r
+                                               rmsg = h0.substring(j + 1);\r
+                                       }\r
+                               }\r
+                       }\r
+                       String xpubid = null;\r
+                       InputStream is;\r
+                       if (rc >= 200 && rc <= 299) {\r
+                               is = uc.getInputStream();\r
+                               xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");\r
+                       } else {\r
+                               if (rc >= 300 && rc <= 399) {\r
+                                       rmsg = uc.getHeaderField("Location");\r
+                               }\r
+                               is = uc.getErrorStream();\r
+                       }\r
+                       byte[] buf = new byte[4096];\r
+                       if (is != null) {\r
+                               while (is.read(buf) > 0) {\r
+                               }\r
+                               is.close();\r
+                       }\r
+                       dth.reportStatus(this, rc, xpubid, rmsg);\r
+               } catch (Exception e) {\r
+                       dth.reportException(this, e);\r
+               }\r
+       }\r
+       /**\r
+        *      Remove meta and data files\r
+        */\r
+       public void clean() {\r
+               datafile.delete();\r
+               metafile.delete();\r
+               hdrs = null;\r
+       }\r
+       /**\r
+        *      Has this delivery task been cleaned?\r
+        */\r
+       public boolean isCleaned() {\r
+               return(hdrs == null);\r
+       }\r
+       /**\r
+        *      Get length of body\r
+        */\r
+       public long     getLength() {\r
+               return(length);\r
+       }\r
+       /**\r
+        *      Get creation date as encoded in the publish ID.\r
+        */\r
+       public long     getDate() {\r
+               return(date);\r
+       }\r
+       /**\r
+        *      Get the most recent delivery attempt URL\r
+        */\r
+       public String getURL() {\r
+               return(url);\r
+       }\r
+       /**\r
+        *      Get the content type\r
+        */\r
+       public String   getCType() {\r
+               return(ctype);\r
+       }\r
+       /**\r
+        *      Get the method\r
+        */\r
+       public String   getMethod() {\r
+               return(method);\r
+       }\r
+       /**\r
+        *      Get the file ID\r
+        */\r
+       public String   getFileId() {\r
+               return(fileid);\r
+       }\r
+       /**\r
+        *      Get the number of delivery attempts\r
+        */\r
+       public int      getAttempts() {\r
+               return(attempts);\r
+       }\r
+       /**\r
+        *      Get the (space delimited list of) subscription ID for this delivery task\r
+        */\r
+       public String   getSubId() {\r
+               return(subid);\r
+       }\r
+       /**\r
+        *      Get the feed ID for this delivery task\r
+        */\r
+       public String   getFeedId() {\r
+               return(feedid);\r
+       }\r
+}\r