Merge "Add Unit Tests for NodeUtils and StatusLog"
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.io.*;
28 import java.net.*;
29 import java.util.*;
30
31 import org.apache.log4j.Logger;
32
33 /**
34  * A file to be delivered to a destination.
35  * <p>
36  * A Delivery task represents a work item for the data router - a file that
37  * needs to be delivered and provides mechanisms to get information about
38  * the file and its delivery data as well as to attempt delivery.
39  */
40 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
41     private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
42     private DeliveryTaskHelper dth;
43     private String pubid;
44     private DestInfo di;
45     private String spool;
46     private File datafile;
47     private File metafile;
48     private long length;
49     private long date;
50     private String method;
51     private String fileid;
52     private String ctype;
53     private String url;
54     private String feedid;
55     private String subid;
56     private int attempts;
57     private String[][] hdrs;
58
59
60     /**
61      * Create a delivery task for a given delivery queue and pub ID
62      *
63      * @param    dth    The delivery task helper for the queue this task is in.
64      * @param    pubid    The publish ID for this file.  This is used as
65      * the base for the file name in the spool directory and is of
66      * the form <milliseconds since 1970>.<fqdn of initial data router node>
67      */
68     public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
69         this.dth = dth;
70         this.pubid = pubid;
71         di = dth.getDestInfo();
72         subid = di.getSubId();
73         feedid = di.getLogData();
74         spool = di.getSpool();
75         String dfn = spool + "/" + pubid;
76         String mfn = dfn + ".M";
77         datafile = new File(spool + "/" + pubid);
78         metafile = new File(mfn);
79         boolean monly = di.isMetaDataOnly();
80         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
81         Vector<String[]> hdrv = new Vector<String[]>();
82
83         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
84             String s = br.readLine();
85             int i = s.indexOf('\t');
86             method = s.substring(0, i);
87             if (!"DELETE".equals(method) && !monly) {
88                 length = datafile.length();
89             }
90             fileid = s.substring(i + 1);
91             while ((s = br.readLine()) != null) {
92                 i = s.indexOf('\t');
93                 String h = s.substring(0, i);
94                 String v = s.substring(i + 1);
95                 if ("x-att-dr-routing".equalsIgnoreCase(h)) {
96                     subid = v.replaceAll("[^ ]*/", "");
97                     feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
98                 }
99                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
100                     continue;
101                 }
102                 if (h.equalsIgnoreCase("content-type")) {
103                     ctype = v;
104                 }
105                 hdrv.add(new String[]{h, v});
106             }
107         } catch (Exception e) {
108             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
109         }
110         hdrs = hdrv.toArray(new String[hdrv.size()][]);
111         url = dth.getDestURL(fileid);
112     }
113     /**
114      * Is the object a DeliveryTask with the same publication ID?
115      */
116     public boolean equals(Object o) {
117         if (!(o instanceof DeliveryTask)) {
118             return (false);
119         }
120         return (pubid.equals(((DeliveryTask) o).pubid));
121     }
122
123     /**
124      * Compare the publication IDs.
125      */
126     public int compareTo(DeliveryTask o) {
127         return (pubid.compareTo(o.pubid));
128     }
129
130     /**
131      * Get the hash code of the publication ID.
132      */
133     public int hashCode() {
134         return (pubid.hashCode());
135     }
136
137     /**
138      * Return the publication ID.
139      */
140     public String toString() {
141         return (pubid);
142     }
143     /**
144      * Get the publish ID
145      */
146     public String getPublishId() {
147         return (pubid);
148     }
149
150     /**
151      * Attempt delivery
152      */
153     public void run() {
154         attempts++;
155         try {
156             di = dth.getDestInfo();
157             boolean expect100 = di.isUsing100();
158             boolean monly = di.isMetaDataOnly();
159             length = 0;
160             if (!"DELETE".equals(method) && !monly) {
161                 length = datafile.length();
162             }
163             url = dth.getDestURL(fileid);
164             URL u = new URL(url);
165             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
166             uc.setConnectTimeout(60000);
167             uc.setReadTimeout(60000);
168             uc.setInstanceFollowRedirects(false);
169             uc.setRequestMethod(method);
170             uc.setRequestProperty("Content-Length", Long.toString(length));
171             uc.setRequestProperty("Authorization", di.getAuth());
172             uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
173             for (String[] nv : hdrs) {
174                 uc.addRequestProperty(nv[0], nv[1]);
175             }
176             if (length > 0) {
177                 if (expect100) {
178                     uc.setRequestProperty("Expect", "100-continue");
179                 }
180                 uc.setFixedLengthStreamingMode(length);
181                 uc.setDoOutput(true);
182                 OutputStream os = null;
183                 try {
184                     os = uc.getOutputStream();
185                 } catch (ProtocolException pe) {
186                     dth.reportDeliveryExtra(this, -1L);
187                     // Rcvd error instead of 100-continue
188                     loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
189                 }
190                 if (os != null) {
191                     long sofar = 0;
192                     try (InputStream is = new FileInputStream(datafile)) {
193                         byte[] buf = new byte[1024 * 1024];
194                         while (sofar < length) {
195                             int i = buf.length;
196                             if (sofar + i > length) {
197                                 i = (int) (length - sofar);
198                             }
199                             i = is.read(buf, 0, i);
200                             if (i <= 0) {
201                                 throw new IOException("Unexpected problem reading data file " + datafile);
202                             }
203                             sofar += i;
204                             os.write(buf, 0, i);
205                         }
206                         os.close();
207                     } catch (IOException ioe) {
208                         dth.reportDeliveryExtra(this, sofar);
209                         throw ioe;
210                     }
211                 }
212             }
213             int rc = uc.getResponseCode();
214             String rmsg = uc.getResponseMessage();
215             if (rmsg == null) {
216                 String h0 = uc.getHeaderField(0);
217                 if (h0 != null) {
218                     int i = h0.indexOf(' ');
219                     int j = h0.indexOf(' ', i + 1);
220                     if (i != -1 && j != -1) {
221                         rmsg = h0.substring(j + 1);
222                     }
223                 }
224             }
225             String xpubid = null;
226             InputStream is;
227             if (rc >= 200 && rc <= 299) {
228                 is = uc.getInputStream();
229                 xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
230             } else {
231                 if (rc >= 300 && rc <= 399) {
232                     rmsg = uc.getHeaderField("Location");
233                 }
234                 is = uc.getErrorStream();
235             }
236             byte[] buf = new byte[4096];
237             if (is != null) {
238                 while (is.read(buf) > 0) {
239                 }
240                 is.close();
241             }
242             dth.reportStatus(this, rc, xpubid, rmsg);
243         } catch (Exception e) {
244             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
245             dth.reportException(this, e);
246         }
247     }
248
249     /**
250      * Remove meta and data files
251      */
252     public void clean() {
253         datafile.delete();
254         metafile.delete();
255         hdrs = null;
256     }
257
258     /**
259      * Has this delivery task been cleaned?
260      */
261     public boolean isCleaned() {
262         return (hdrs == null);
263     }
264
265     /**
266      * Get length of body
267      */
268     public long getLength() {
269         return (length);
270     }
271
272     /**
273      * Get creation date as encoded in the publish ID.
274      */
275     public long getDate() {
276         return (date);
277     }
278
279     /**
280      * Get the most recent delivery attempt URL
281      */
282     public String getURL() {
283         return (url);
284     }
285
286     /**
287      * Get the content type
288      */
289     public String getCType() {
290         return (ctype);
291     }
292
293     /**
294      * Get the method
295      */
296     public String getMethod() {
297         return (method);
298     }
299
300     /**
301      * Get the file ID
302      */
303     public String getFileId() {
304         return (fileid);
305     }
306
307     /**
308      * Get the number of delivery attempts
309      */
310     public int getAttempts() {
311         return (attempts);
312     }
313
314     /**
315      * Get the (space delimited list of) subscription ID for this delivery task
316      */
317     public String getSubId() {
318         return (subid);
319     }
320
321     /**
322      * Get the feed ID for this delivery task
323      */
324     public String getFeedId() {
325         return (feedid);
326     }
327 }