1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
31 import com.att.eelf.configuration.EELFLogger;
32 import com.att.eelf.configuration.EELFManager;
33 import org.apache.log4j.Logger;
34 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
37 import static com.att.eelf.configuration.Configuration.*;
40 * A file to be delivered to a destination.
42 * A Delivery task represents a work item for the data router - a file that
43 * needs to be delivered and provides mechanisms to get information about
44 * the file and its delivery data as well as to attempt delivery.
46 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
47 private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
48 private static EELFLogger eelflogger = EELFManager.getInstance()
49 .getLogger(DeliveryTask.class);
50 private DeliveryTaskHelper dth;
54 private File datafile;
55 private File metafile;
58 private String method;
59 private String fileid;
62 private String feedid;
65 private String[][] hdrs;
66 private String newInvocationId;
70 * Create a delivery task for a given delivery queue and pub ID
72 * @param dth The delivery task helper for the queue this task is in.
73 * @param pubid The publish ID for this file. This is used as
74 * the base for the file name in the spool directory and is of
75 * the form <milliseconds since 1970>.<fqdn of initial data router node>
77 public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
80 di = dth.getDestInfo();
81 subid = di.getSubId();
82 feedid = di.getLogData();
83 spool = di.getSpool();
84 String dfn = spool + "/" + pubid;
85 String mfn = dfn + ".M";
86 datafile = new File(spool + "/" + pubid);
87 metafile = new File(mfn);
88 boolean monly = di.isMetaDataOnly();
89 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
90 Vector<String[]> hdrv = new Vector<String[]>();
92 try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
93 String s = br.readLine();
94 int i = s.indexOf('\t');
95 method = s.substring(0, i);
96 NodeUtils.setIpAndFqdnForEelf(method);
97 if (!"DELETE".equals(method) && !monly) {
98 length = datafile.length();
100 fileid = s.substring(i + 1);
101 while ((s = br.readLine()) != null) {
103 String h = s.substring(0, i);
104 String v = s.substring(i + 1);
105 if ("x-att-dr-routing".equalsIgnoreCase(h)) {
106 subid = v.replaceAll("[^ ]*/", "");
107 feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
109 if (length == 0 && h.toLowerCase().startsWith("content-")) {
112 if (h.equalsIgnoreCase("content-type")) {
115 if (h.equalsIgnoreCase("x-onap-requestid")) {
116 MDC.put(MDC_KEY_REQUEST_ID, v);
118 if (h.equalsIgnoreCase("x-invocationid")) {
119 MDC.put("InvocationId", v);
120 v = UUID.randomUUID().toString();
123 hdrv.add(new String[]{h, v});
125 } catch (Exception e) {
126 loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
128 hdrs = hdrv.toArray(new String[hdrv.size()][]);
129 url = dth.getDestURL(fileid);
132 * Is the object a DeliveryTask with the same publication ID?
134 public boolean equals(Object o) {
135 if (!(o instanceof DeliveryTask)) {
138 return (pubid.equals(((DeliveryTask) o).pubid));
142 * Compare the publication IDs.
144 public int compareTo(DeliveryTask o) {
145 return (pubid.compareTo(o.pubid));
149 * Get the hash code of the publication ID.
151 public int hashCode() {
152 return (pubid.hashCode());
156 * Return the publication ID.
158 public String toString() {
164 public String getPublishId() {
174 di = dth.getDestInfo();
175 boolean expect100 = di.isUsing100();
176 boolean monly = di.isMetaDataOnly();
178 if (!"DELETE".equals(method) && !monly) {
179 length = datafile.length();
181 url = dth.getDestURL(fileid);
182 URL u = new URL(url);
183 HttpURLConnection uc = (HttpURLConnection) u.openConnection();
184 uc.setConnectTimeout(60000);
185 uc.setReadTimeout(60000);
186 uc.setInstanceFollowRedirects(false);
187 uc.setRequestMethod(method);
188 uc.setRequestProperty("Content-Length", Long.toString(length));
189 uc.setRequestProperty("Authorization", di.getAuth());
190 uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
191 for (String[] nv : hdrs) {
192 uc.addRequestProperty(nv[0], nv[1]);
196 uc.setRequestProperty("Expect", "100-continue");
198 uc.setFixedLengthStreamingMode(length);
199 uc.setDoOutput(true);
200 OutputStream os = null;
202 os = uc.getOutputStream();
203 } catch (ProtocolException pe) {
204 dth.reportDeliveryExtra(this, -1L);
205 // Rcvd error instead of 100-continue
206 loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
210 try (InputStream is = new FileInputStream(datafile)) {
211 byte[] buf = new byte[1024 * 1024];
212 while (sofar < length) {
214 if (sofar + i > length) {
215 i = (int) (length - sofar);
217 i = is.read(buf, 0, i);
219 throw new IOException("Unexpected problem reading data file " + datafile);
225 } catch (IOException ioe) {
226 dth.reportDeliveryExtra(this, sofar);
231 int rc = uc.getResponseCode();
232 String rmsg = uc.getResponseMessage();
234 String h0 = uc.getHeaderField(0);
236 int i = h0.indexOf(' ');
237 int j = h0.indexOf(' ', i + 1);
238 if (i != -1 && j != -1) {
239 rmsg = h0.substring(j + 1);
243 String xpubid = null;
245 if (rc >= 200 && rc <= 299) {
246 is = uc.getInputStream();
247 xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
249 if (rc >= 300 && rc <= 399) {
250 rmsg = uc.getHeaderField("Location");
252 is = uc.getErrorStream();
254 byte[] buf = new byte[4096];
256 while (is.read(buf) > 0) {
260 dth.reportStatus(this, rc, xpubid, rmsg);
261 } catch (Exception e) {
262 loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
263 dth.reportException(this, e);
268 * Remove meta and data files
270 public void clean() {
273 eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
274 eelflogger.info(EelfMsgs.EXIT);
279 * Has this delivery task been cleaned?
281 public boolean isCleaned() {
282 return (hdrs == null);
288 public long getLength() {
293 * Get creation date as encoded in the publish ID.
295 public long getDate() {
300 * Get the most recent delivery attempt URL
302 public String getURL() {
307 * Get the content type
309 public String getCType() {
316 public String getMethod() {
323 public String getFileId() {
328 * Get the number of delivery attempts
330 public int getAttempts() {
335 * Get the (space delimited list of) subscription ID for this delivery task
337 public String getSubId() {
342 * Get the feed ID for this delivery task
344 public String getFeedId() {