-/*******************************************************************************\r
- * ============LICENSE_START==================================================\r
- * * org.onap.dmaap\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * * \r
- * * http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
- * * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-\r
-\r
-package org.onap.dmaap.datarouter.node;\r
-\r
-import java.io.*;\r
-import java.net.*;\r
-import java.util.*;\r
-import org.apache.log4j.Logger;\r
-\r
-/**\r
- * A file to be delivered to a destination.\r
- * <p>\r
- * A Delivery task represents a work item for the data router - a file that\r
- * needs to be delivered and provides mechanisms to get information about\r
- * the file and its delivery data as well as to attempt delivery.\r
- */\r
-public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {\r
- private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");\r
- private DeliveryTaskHelper dth;\r
- private String pubid;\r
- private DestInfo di;\r
- private String spool;\r
- private File datafile;\r
- private File metafile;\r
- private long length;\r
- private long date;\r
- private String method;\r
- private String fileid;\r
- private String ctype;\r
- private String url;\r
- private String feedid;\r
- private String subid;\r
- private int attempts;\r
- private String[][] hdrs;\r
- /**\r
- * Is the object a DeliveryTask with the same publication ID?\r
- */\r
- public boolean equals(Object o) {\r
- if (!(o instanceof DeliveryTask)) {\r
- return(false);\r
- }\r
- return(pubid.equals(((DeliveryTask)o).pubid));\r
- }\r
- /**\r
- * Compare the publication IDs.\r
- */\r
- public int compareTo(DeliveryTask o) {\r
- return(pubid.compareTo(o.pubid));\r
- }\r
- /**\r
- * Get the hash code of the publication ID.\r
- */\r
- public int hashCode() {\r
- return(pubid.hashCode());\r
- }\r
- /**\r
- * Return the publication ID.\r
- */\r
- public String toString() {\r
- return(pubid);\r
- }\r
- /**\r
- * Create a delivery task for a given delivery queue and pub ID\r
- * @param dth The delivery task helper for the queue this task is in.\r
- * @param pubid The publish ID for this file. This is used as\r
- * the base for the file name in the spool directory and is of\r
- * the form <milliseconds since 1970>.<fqdn of initial data router node>\r
- */\r
- public DeliveryTask(DeliveryTaskHelper dth, String pubid) {\r
- this.dth = dth;\r
- this.pubid = pubid;\r
- di = dth.getDestInfo();\r
- subid = di.getSubId();\r
- feedid = di.getLogData();\r
- spool = di.getSpool();\r
- String dfn = spool + "/" + pubid;\r
- String mfn = dfn + ".M";\r
- datafile = new File(spool + "/" + pubid);\r
- metafile = new File(mfn);\r
- boolean monly = di.isMetaDataOnly();\r
- date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));\r
- Vector<String[]> hdrv = new Vector<String[]>();\r
- try {\r
- BufferedReader br = new BufferedReader(new FileReader(metafile));\r
- String s = br.readLine();\r
- int i = s.indexOf('\t');\r
- method = s.substring(0, i);\r
- if (!"DELETE".equals(method) && !monly) {\r
- length = datafile.length();\r
- }\r
- fileid = s.substring(i + 1);\r
- while ((s = br.readLine()) != null) {\r
- i = s.indexOf('\t');\r
- String h = s.substring(0, i);\r
- String v = s.substring(i + 1);\r
- if ("x-att-dr-routing".equalsIgnoreCase(h)) {\r
- subid = v.replaceAll("[^ ]*/", "");\r
- feedid = dth.getFeedId(subid.replaceAll(" .*", ""));\r
- }\r
- if (length == 0 && h.toLowerCase().startsWith("content-")) {\r
- continue;\r
- }\r
- if (h.equalsIgnoreCase("content-type")) {\r
- ctype = v;\r
- }\r
- hdrv.add(new String[] {h, v});\r
- }\r
- br.close();\r
- } catch (Exception e) {\r
- }\r
- hdrs = hdrv.toArray(new String[hdrv.size()][]);\r
- url = dth.getDestURL(fileid);\r
- }\r
- /**\r
- * Get the publish ID\r
- */\r
- public String getPublishId() {\r
- return(pubid);\r
- }\r
- /**\r
- * Attempt delivery\r
- */\r
- public void run() {\r
- attempts++;\r
- try {\r
- di = dth.getDestInfo();\r
- boolean expect100 = di.isUsing100();\r
- boolean monly = di.isMetaDataOnly();\r
- length = 0;\r
- if (!"DELETE".equals(method) && !monly) {\r
- length = datafile.length();\r
- }\r
- url = dth.getDestURL(fileid);\r
- URL u = new URL(url);\r
- HttpURLConnection uc = (HttpURLConnection)u.openConnection();\r
- uc.setConnectTimeout(60000);\r
- uc.setReadTimeout(60000);\r
- uc.setInstanceFollowRedirects(false);\r
- uc.setRequestMethod(method);\r
- uc.setRequestProperty("Content-Length", Long.toString(length));\r
- uc.setRequestProperty("Authorization", di.getAuth());\r
- uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);\r
- for (String[] nv: hdrs) {\r
- uc.addRequestProperty(nv[0], nv[1]);\r
- }\r
- if (length > 0) {\r
- if (expect100) {\r
- uc.setRequestProperty("Expect", "100-continue");\r
- }\r
- uc.setFixedLengthStreamingMode(length);\r
- uc.setDoOutput(true);\r
- OutputStream os = null;\r
- try {\r
- os = uc.getOutputStream();\r
- } catch (ProtocolException pe) {\r
- dth.reportDeliveryExtra(this, -1L);\r
- // Rcvd error instead of 100-continue\r
- }\r
- if (os != null) {\r
- long sofar = 0;\r
- try {\r
- byte[] buf = new byte[1024 * 1024];\r
- InputStream is = new FileInputStream(datafile);\r
- while (sofar < length) {\r
- int i = buf.length;\r
- if (sofar + i > length) {\r
- i = (int)(length - sofar);\r
- }\r
- i = is.read(buf, 0, i);\r
- if (i <= 0) {\r
- throw new IOException("Unexpected problem reading data file " + datafile);\r
- }\r
- sofar += i;\r
- os.write(buf, 0, i);\r
- }\r
- is.close();\r
- os.close();\r
- } catch (IOException ioe) {\r
- dth.reportDeliveryExtra(this, sofar);\r
- throw ioe;\r
- }\r
- }\r
- }\r
- int rc = uc.getResponseCode();\r
- String rmsg = uc.getResponseMessage();\r
- if (rmsg == null) {\r
- String h0 = uc.getHeaderField(0);\r
- if (h0 != null) {\r
- int i = h0.indexOf(' ');\r
- int j = h0.indexOf(' ', i + 1);\r
- if (i != -1 && j != -1) {\r
- rmsg = h0.substring(j + 1);\r
- }\r
- }\r
- }\r
- String xpubid = null;\r
- InputStream is;\r
- if (rc >= 200 && rc <= 299) {\r
- is = uc.getInputStream();\r
- xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");\r
- } else {\r
- if (rc >= 300 && rc <= 399) {\r
- rmsg = uc.getHeaderField("Location");\r
- }\r
- is = uc.getErrorStream();\r
- }\r
- byte[] buf = new byte[4096];\r
- if (is != null) {\r
- while (is.read(buf) > 0) {\r
- }\r
- is.close();\r
- }\r
- dth.reportStatus(this, rc, xpubid, rmsg);\r
- } catch (Exception e) {\r
- dth.reportException(this, e);\r
- }\r
- }\r
- /**\r
- * Remove meta and data files\r
- */\r
- public void clean() {\r
- datafile.delete();\r
- metafile.delete();\r
- hdrs = null;\r
- }\r
- /**\r
- * Has this delivery task been cleaned?\r
- */\r
- public boolean isCleaned() {\r
- return(hdrs == null);\r
- }\r
- /**\r
- * Get length of body\r
- */\r
- public long getLength() {\r
- return(length);\r
- }\r
- /**\r
- * Get creation date as encoded in the publish ID.\r
- */\r
- public long getDate() {\r
- return(date);\r
- }\r
- /**\r
- * Get the most recent delivery attempt URL\r
- */\r
- public String getURL() {\r
- return(url);\r
- }\r
- /**\r
- * Get the content type\r
- */\r
- public String getCType() {\r
- return(ctype);\r
- }\r
- /**\r
- * Get the method\r
- */\r
- public String getMethod() {\r
- return(method);\r
- }\r
- /**\r
- * Get the file ID\r
- */\r
- public String getFileId() {\r
- return(fileid);\r
- }\r
- /**\r
- * Get the number of delivery attempts\r
- */\r
- public int getAttempts() {\r
- return(attempts);\r
- }\r
- /**\r
- * Get the (space delimited list of) subscription ID for this delivery task\r
- */\r
- public String getSubId() {\r
- return(subid);\r
- }\r
- /**\r
- * Get the feed ID for this delivery task\r
- */\r
- public String getFeedId() {\r
- return(feedid);\r
- }\r
-}\r
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+package org.onap.dmaap.datarouter.node;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A file to be delivered to a destination.
+ * <p>
+ * A Delivery task represents a work item for the data router - a file that
+ * needs to be delivered and provides mechanisms to get information about
+ * the file and its delivery data as well as to attempt delivery.
+ */
+public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
+ private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
+ private DeliveryTaskHelper dth;
+ private String pubid;
+ private DestInfo di;
+ private String spool;
+ private File datafile;
+ private File metafile;
+ private long length;
+ private long date;
+ private String method;
+ private String fileid;
+ private String ctype;
+ private String url;
+ private String feedid;
+ private String subid;
+ private int attempts;
+ private String[][] hdrs;
+
+
+ /**
+ * Create a delivery task for a given delivery queue and pub ID
+ *
+ * @param dth The delivery task helper for the queue this task is in.
+ * @param pubid The publish ID for this file. This is used as
+ * the base for the file name in the spool directory and is of
+ * the form <milliseconds since 1970>.<fqdn of initial data router node>
+ */
+ public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
+ this.dth = dth;
+ this.pubid = pubid;
+ di = dth.getDestInfo();
+ subid = di.getSubId();
+ feedid = di.getLogData();
+ spool = di.getSpool();
+ String dfn = spool + "/" + pubid;
+ String mfn = dfn + ".M";
+ datafile = new File(spool + "/" + pubid);
+ metafile = new File(mfn);
+ boolean monly = di.isMetaDataOnly();
+ date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
+ Vector<String[]> hdrv = new Vector<String[]>();
+
+ try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
+ String s = br.readLine();
+ int i = s.indexOf('\t');
+ method = s.substring(0, i);
+ if (!"DELETE".equals(method) && !monly) {
+ length = datafile.length();
+ }
+ fileid = s.substring(i + 1);
+ while ((s = br.readLine()) != null) {
+ i = s.indexOf('\t');
+ String h = s.substring(0, i);
+ String v = s.substring(i + 1);
+ if ("x-att-dr-routing".equalsIgnoreCase(h)) {
+ subid = v.replaceAll("[^ ]*/", "");
+ feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+ }
+ if (length == 0 && h.toLowerCase().startsWith("content-")) {
+ continue;
+ }
+ if (h.equalsIgnoreCase("content-type")) {
+ ctype = v;
+ }
+ hdrv.add(new String[]{h, v});
+ }
+ } catch (Exception e) {
+ loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+ }
+ hdrs = hdrv.toArray(new String[hdrv.size()][]);
+ url = dth.getDestURL(fileid);
+ }
+ /**
+ * Is the object a DeliveryTask with the same publication ID?
+ */
+ public boolean equals(Object o) {
+ if (!(o instanceof DeliveryTask)) {
+ return (false);
+ }
+ return (pubid.equals(((DeliveryTask) o).pubid));
+ }
+
+ /**
+ * Compare the publication IDs.
+ */
+ public int compareTo(DeliveryTask o) {
+ return (pubid.compareTo(o.pubid));
+ }
+
+ /**
+ * Get the hash code of the publication ID.
+ */
+ public int hashCode() {
+ return (pubid.hashCode());
+ }
+
+ /**
+ * Return the publication ID.
+ */
+ public String toString() {
+ return (pubid);
+ }
+ /**
+ * Get the publish ID
+ */
+ public String getPublishId() {
+ return (pubid);
+ }
+
+ /**
+ * Attempt delivery
+ */
+ public void run() {
+ attempts++;
+ try {
+ di = dth.getDestInfo();
+ boolean expect100 = di.isUsing100();
+ boolean monly = di.isMetaDataOnly();
+ length = 0;
+ if (!"DELETE".equals(method) && !monly) {
+ length = datafile.length();
+ }
+ url = dth.getDestURL(fileid);
+ URL u = new URL(url);
+ HttpURLConnection uc = (HttpURLConnection) u.openConnection();
+ uc.setConnectTimeout(60000);
+ uc.setReadTimeout(60000);
+ uc.setInstanceFollowRedirects(false);
+ uc.setRequestMethod(method);
+ uc.setRequestProperty("Content-Length", Long.toString(length));
+ uc.setRequestProperty("Authorization", di.getAuth());
+ uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
+ for (String[] nv : hdrs) {
+ uc.addRequestProperty(nv[0], nv[1]);
+ }
+ if (length > 0) {
+ if (expect100) {
+ uc.setRequestProperty("Expect", "100-continue");
+ }
+ uc.setFixedLengthStreamingMode(length);
+ uc.setDoOutput(true);
+ OutputStream os = null;
+ try {
+ os = uc.getOutputStream();
+ } catch (ProtocolException pe) {
+ dth.reportDeliveryExtra(this, -1L);
+ // Rcvd error instead of 100-continue
+ loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
+ }
+ if (os != null) {
+ long sofar = 0;
+ try (InputStream is = new FileInputStream(datafile)) {
+ byte[] buf = new byte[1024 * 1024];
+ while (sofar < length) {
+ int i = buf.length;
+ if (sofar + i > length) {
+ i = (int) (length - sofar);
+ }
+ i = is.read(buf, 0, i);
+ if (i <= 0) {
+ throw new IOException("Unexpected problem reading data file " + datafile);
+ }
+ sofar += i;
+ os.write(buf, 0, i);
+ }
+ os.close();
+ } catch (IOException ioe) {
+ dth.reportDeliveryExtra(this, sofar);
+ throw ioe;
+ }
+ }
+ }
+ int rc = uc.getResponseCode();
+ String rmsg = uc.getResponseMessage();
+ if (rmsg == null) {
+ String h0 = uc.getHeaderField(0);
+ if (h0 != null) {
+ int i = h0.indexOf(' ');
+ int j = h0.indexOf(' ', i + 1);
+ if (i != -1 && j != -1) {
+ rmsg = h0.substring(j + 1);
+ }
+ }
+ }
+ String xpubid = null;
+ InputStream is;
+ if (rc >= 200 && rc <= 299) {
+ is = uc.getInputStream();
+ xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
+ } else {
+ if (rc >= 300 && rc <= 399) {
+ rmsg = uc.getHeaderField("Location");
+ }
+ is = uc.getErrorStream();
+ }
+ byte[] buf = new byte[4096];
+ if (is != null) {
+ while (is.read(buf) > 0) {
+ }
+ is.close();
+ }
+ dth.reportStatus(this, rc, xpubid, rmsg);
+ } catch (Exception e) {
+ loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+ dth.reportException(this, e);
+ }
+ }
+
+ /**
+ * Remove meta and data files
+ */
+ public void clean() {
+ datafile.delete();
+ metafile.delete();
+ hdrs = null;
+ }
+
+ /**
+ * Has this delivery task been cleaned?
+ */
+ public boolean isCleaned() {
+ return (hdrs == null);
+ }
+
+ /**
+ * Get length of body
+ */
+ public long getLength() {
+ return (length);
+ }
+
+ /**
+ * Get creation date as encoded in the publish ID.
+ */
+ public long getDate() {
+ return (date);
+ }
+
+ /**
+ * Get the most recent delivery attempt URL
+ */
+ public String getURL() {
+ return (url);
+ }
+
+ /**
+ * Get the content type
+ */
+ public String getCType() {
+ return (ctype);
+ }
+
+ /**
+ * Get the method
+ */
+ public String getMethod() {
+ return (method);
+ }
+
+ /**
+ * Get the file ID
+ */
+ public String getFileId() {
+ return (fileid);
+ }
+
+ /**
+ * Get the number of delivery attempts
+ */
+ public int getAttempts() {
+ return (attempts);
+ }
+
+ /**
+ * Get the (space delimited list of) subscription ID for this delivery task
+ */
+ public String getSubId() {
+ return (subid);
+ }
+
+ /**
+ * Get the feed ID for this delivery task
+ */
+ public String getFeedId() {
+ return (feedid);
+ }
+}