1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
31 import org.apache.log4j.Logger;
34 * A file to be delivered to a destination.
36 * A Delivery task represents a work item for the data router - a file that
37 * needs to be delivered and provides mechanisms to get information about
38 * the file and its delivery data as well as to attempt delivery.
40 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
41 private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
42 private DeliveryTaskHelper dth;
46 private File datafile;
47 private File metafile;
50 private String method;
51 private String fileid;
54 private String feedid;
57 private String[][] hdrs;
61 * Create a delivery task for a given delivery queue and pub ID
63 * @param dth The delivery task helper for the queue this task is in.
64 * @param pubid The publish ID for this file. This is used as
65 * the base for the file name in the spool directory and is of
66 * the form <milliseconds since 1970>.<fqdn of initial data router node>
68 public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
71 di = dth.getDestInfo();
72 subid = di.getSubId();
73 feedid = di.getLogData();
74 spool = di.getSpool();
75 String dfn = spool + "/" + pubid;
76 String mfn = dfn + ".M";
77 datafile = new File(spool + "/" + pubid);
78 metafile = new File(mfn);
79 boolean monly = di.isMetaDataOnly();
80 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
81 Vector<String[]> hdrv = new Vector<String[]>();
83 try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
84 String s = br.readLine();
85 int i = s.indexOf('\t');
86 method = s.substring(0, i);
87 if (!"DELETE".equals(method) && !monly) {
88 length = datafile.length();
90 fileid = s.substring(i + 1);
91 while ((s = br.readLine()) != null) {
93 String h = s.substring(0, i);
94 String v = s.substring(i + 1);
95 if ("x-att-dr-routing".equalsIgnoreCase(h)) {
96 subid = v.replaceAll("[^ ]*/", "");
97 feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
99 if (length == 0 && h.toLowerCase().startsWith("content-")) {
102 if (h.equalsIgnoreCase("content-type")) {
105 hdrv.add(new String[]{h, v});
107 } catch (Exception e) {
108 loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
110 hdrs = hdrv.toArray(new String[hdrv.size()][]);
111 url = dth.getDestURL(fileid);
114 * Is the object a DeliveryTask with the same publication ID?
116 public boolean equals(Object o) {
117 if (!(o instanceof DeliveryTask)) {
120 return (pubid.equals(((DeliveryTask) o).pubid));
124 * Compare the publication IDs.
126 public int compareTo(DeliveryTask o) {
127 return (pubid.compareTo(o.pubid));
131 * Get the hash code of the publication ID.
133 public int hashCode() {
134 return (pubid.hashCode());
138 * Return the publication ID.
140 public String toString() {
146 public String getPublishId() {
156 di = dth.getDestInfo();
157 boolean expect100 = di.isUsing100();
158 boolean monly = di.isMetaDataOnly();
160 if (!"DELETE".equals(method) && !monly) {
161 length = datafile.length();
163 url = dth.getDestURL(fileid);
164 URL u = new URL(url);
165 HttpURLConnection uc = (HttpURLConnection) u.openConnection();
166 uc.setConnectTimeout(60000);
167 uc.setReadTimeout(60000);
168 uc.setInstanceFollowRedirects(false);
169 uc.setRequestMethod(method);
170 uc.setRequestProperty("Content-Length", Long.toString(length));
171 uc.setRequestProperty("Authorization", di.getAuth());
172 uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
173 for (String[] nv : hdrs) {
174 uc.addRequestProperty(nv[0], nv[1]);
178 uc.setRequestProperty("Expect", "100-continue");
180 uc.setFixedLengthStreamingMode(length);
181 uc.setDoOutput(true);
182 OutputStream os = null;
184 os = uc.getOutputStream();
185 } catch (ProtocolException pe) {
186 dth.reportDeliveryExtra(this, -1L);
187 // Rcvd error instead of 100-continue
188 loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
192 try (InputStream is = new FileInputStream(datafile)) {
193 byte[] buf = new byte[1024 * 1024];
194 while (sofar < length) {
196 if (sofar + i > length) {
197 i = (int) (length - sofar);
199 i = is.read(buf, 0, i);
201 throw new IOException("Unexpected problem reading data file " + datafile);
207 } catch (IOException ioe) {
208 dth.reportDeliveryExtra(this, sofar);
213 int rc = uc.getResponseCode();
214 String rmsg = uc.getResponseMessage();
216 String h0 = uc.getHeaderField(0);
218 int i = h0.indexOf(' ');
219 int j = h0.indexOf(' ', i + 1);
220 if (i != -1 && j != -1) {
221 rmsg = h0.substring(j + 1);
225 String xpubid = null;
227 if (rc >= 200 && rc <= 299) {
228 is = uc.getInputStream();
229 xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
231 if (rc >= 300 && rc <= 399) {
232 rmsg = uc.getHeaderField("Location");
234 is = uc.getErrorStream();
236 byte[] buf = new byte[4096];
238 while (is.read(buf) > 0) {
242 dth.reportStatus(this, rc, xpubid, rmsg);
243 } catch (Exception e) {
244 loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
245 dth.reportException(this, e);
250 * Remove meta and data files
252 public void clean() {
259 * Has this delivery task been cleaned?
261 public boolean isCleaned() {
262 return (hdrs == null);
268 public long getLength() {
273 * Get creation date as encoded in the publish ID.
275 public long getDate() {
280 * Get the most recent delivery attempt URL
282 public String getURL() {
287 * Get the content type
289 public String getCType() {
296 public String getMethod() {
303 public String getFileId() {
308 * Get the number of delivery attempts
310 public int getAttempts() {
315 * Get the (space delimited list of) subscription ID for this delivery task
317 public String getSubId() {
322 * Get the feed ID for this delivery task
324 public String getFeedId() {