1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package com.att.research.datarouter.node;
\r
30 import org.apache.log4j.Logger;
\r
33 * A file to be delivered to a destination.
\r
35 * A Delivery task represents a work item for the data router - a file that
\r
36 * needs to be delivered and provides mechanisms to get information about
\r
37 * the file and its delivery data as well as to attempt delivery.
\r
39 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
\r
40 private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.DeliveryTask");
\r
41 private DeliveryTaskHelper dth;
\r
42 private String pubid;
\r
43 private DestInfo di;
\r
44 private String spool;
\r
45 private File datafile;
\r
46 private File metafile;
\r
47 private long length;
\r
49 private String method;
\r
50 private String fileid;
\r
51 private String ctype;
\r
53 private String feedid;
\r
54 private String subid;
\r
55 private int attempts;
\r
56 private String[][] hdrs;
\r
58 * Is the object a DeliveryTask with the same publication ID?
\r
60 public boolean equals(Object o) {
\r
61 if (!(o instanceof DeliveryTask)) {
\r
64 return(pubid.equals(((DeliveryTask)o).pubid));
\r
67 * Compare the publication IDs.
\r
69 public int compareTo(DeliveryTask o) {
\r
70 return(pubid.compareTo(o.pubid));
\r
73 * Get the hash code of the publication ID.
\r
75 public int hashCode() {
\r
76 return(pubid.hashCode());
\r
79 * Return the publication ID.
\r
81 public String toString() {
\r
85 * Create a delivery task for a given delivery queue and pub ID
\r
86 * @param dth The delivery task helper for the queue this task is in.
\r
87 * @param pubid The publish ID for this file. This is used as
\r
88 * the base for the file name in the spool directory and is of
\r
89 * the form <milliseconds since 1970>.<fqdn of initial data router node>
\r
91 public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
\r
94 di = dth.getDestInfo();
\r
95 subid = di.getSubId();
\r
96 feedid = di.getLogData();
\r
97 spool = di.getSpool();
\r
98 String dfn = spool + "/" + pubid;
\r
99 String mfn = dfn + ".M";
\r
100 datafile = new File(spool + "/" + pubid);
\r
101 metafile = new File(mfn);
\r
102 boolean monly = di.isMetaDataOnly();
\r
103 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
\r
104 Vector<String[]> hdrv = new Vector<String[]>();
\r
106 BufferedReader br = new BufferedReader(new FileReader(metafile));
\r
107 String s = br.readLine();
\r
108 int i = s.indexOf('\t');
\r
109 method = s.substring(0, i);
\r
110 if (!"DELETE".equals(method) && !monly) {
\r
111 length = datafile.length();
\r
113 fileid = s.substring(i + 1);
\r
114 while ((s = br.readLine()) != null) {
\r
115 i = s.indexOf('\t');
\r
116 String h = s.substring(0, i);
\r
117 String v = s.substring(i + 1);
\r
118 if ("x-att-dr-routing".equalsIgnoreCase(h)) {
\r
119 subid = v.replaceAll("[^ ]*/", "");
\r
120 feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
\r
122 if (length == 0 && h.toLowerCase().startsWith("content-")) {
\r
125 if (h.equalsIgnoreCase("content-type")) {
\r
128 hdrv.add(new String[] {h, v});
\r
131 } catch (Exception e) {
\r
133 hdrs = hdrv.toArray(new String[hdrv.size()][]);
\r
134 url = dth.getDestURL(fileid);
\r
137 * Get the publish ID
\r
139 public String getPublishId() {
\r
145 public void run() {
\r
148 di = dth.getDestInfo();
\r
149 boolean expect100 = di.isUsing100();
\r
150 boolean monly = di.isMetaDataOnly();
\r
152 if (!"DELETE".equals(method) && !monly) {
\r
153 length = datafile.length();
\r
155 url = dth.getDestURL(fileid);
\r
156 URL u = new URL(url);
\r
157 HttpURLConnection uc = (HttpURLConnection)u.openConnection();
\r
158 uc.setConnectTimeout(60000);
\r
159 uc.setReadTimeout(60000);
\r
160 uc.setInstanceFollowRedirects(false);
\r
161 uc.setRequestMethod(method);
\r
162 uc.setRequestProperty("Content-Length", Long.toString(length));
\r
163 uc.setRequestProperty("Authorization", di.getAuth());
\r
164 uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
\r
165 for (String[] nv: hdrs) {
\r
166 uc.addRequestProperty(nv[0], nv[1]);
\r
170 uc.setRequestProperty("Expect", "100-continue");
\r
172 uc.setFixedLengthStreamingMode(length);
\r
173 uc.setDoOutput(true);
\r
174 OutputStream os = null;
\r
176 os = uc.getOutputStream();
\r
177 } catch (ProtocolException pe) {
\r
178 dth.reportDeliveryExtra(this, -1L);
\r
179 // Rcvd error instead of 100-continue
\r
184 byte[] buf = new byte[1024 * 1024];
\r
185 InputStream is = new FileInputStream(datafile);
\r
186 while (sofar < length) {
\r
187 int i = buf.length;
\r
188 if (sofar + i > length) {
\r
189 i = (int)(length - sofar);
\r
191 i = is.read(buf, 0, i);
\r
193 throw new IOException("Unexpected problem reading data file " + datafile);
\r
196 os.write(buf, 0, i);
\r
200 } catch (IOException ioe) {
\r
201 dth.reportDeliveryExtra(this, sofar);
\r
206 int rc = uc.getResponseCode();
\r
207 String rmsg = uc.getResponseMessage();
\r
208 if (rmsg == null) {
\r
209 String h0 = uc.getHeaderField(0);
\r
211 int i = h0.indexOf(' ');
\r
212 int j = h0.indexOf(' ', i + 1);
\r
213 if (i != -1 && j != -1) {
\r
214 rmsg = h0.substring(j + 1);
\r
218 String xpubid = null;
\r
220 if (rc >= 200 && rc <= 299) {
\r
221 is = uc.getInputStream();
\r
222 xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
\r
224 if (rc >= 300 && rc <= 399) {
\r
225 rmsg = uc.getHeaderField("Location");
\r
227 is = uc.getErrorStream();
\r
229 byte[] buf = new byte[4096];
\r
231 while (is.read(buf) > 0) {
\r
235 dth.reportStatus(this, rc, xpubid, rmsg);
\r
236 } catch (Exception e) {
\r
237 dth.reportException(this, e);
\r
241 * Remove meta and data files
\r
243 public void clean() {
\r
249 * Has this delivery task been cleaned?
\r
251 public boolean isCleaned() {
\r
252 return(hdrs == null);
\r
255 * Get length of body
\r
257 public long getLength() {
\r
261 * Get creation date as encoded in the publish ID.
\r
263 public long getDate() {
\r
267 * Get the most recent delivery attempt URL
\r
269 public String getURL() {
\r
273 * Get the content type
\r
275 public String getCType() {
\r
281 public String getMethod() {
\r
287 public String getFileId() {
\r
291 * Get the number of delivery attempts
\r
293 public int getAttempts() {
\r
297 * Get the (space delimited list of) subscription ID for this delivery task
\r
299 public String getSubId() {
\r
303 * Get the feed ID for this delivery task
\r
305 public String getFeedId() {
\r