Merge "Unit test base"
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.io.*;
28 import java.net.*;
29 import java.util.*;
30
31 import org.apache.log4j.Logger;
32
33 /**
34  * A file to be delivered to a destination.
35  * <p>
36  * A Delivery task represents a work item for the data router - a file that
37  * needs to be delivered and provides mechanisms to get information about
38  * the file and its delivery data as well as to attempt delivery.
39  */
40 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
41     private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
42     private DeliveryTaskHelper dth;
43     private String pubid;
44     private DestInfo di;
45     private String spool;
46     private File datafile;
47     private File metafile;
48     private long length;
49     private long date;
50     private String method;
51     private String fileid;
52     private String ctype;
53     private String url;
54     private String feedid;
55     private String subid;
56     private int attempts;
57     private String[][] hdrs;
58
59     /**
60      * Is the object a DeliveryTask with the same publication ID?
61      */
62     public boolean equals(Object o) {
63         if (!(o instanceof DeliveryTask)) {
64             return (false);
65         }
66         return (pubid.equals(((DeliveryTask) o).pubid));
67     }
68
69     /**
70      * Compare the publication IDs.
71      */
72     public int compareTo(DeliveryTask o) {
73         return (pubid.compareTo(o.pubid));
74     }
75
76     /**
77      * Get the hash code of the publication ID.
78      */
79     public int hashCode() {
80         return (pubid.hashCode());
81     }
82
83     /**
84      * Return the publication ID.
85      */
86     public String toString() {
87         return (pubid);
88     }
89
90     /**
91      * Create a delivery task for a given delivery queue and pub ID
92      *
93      * @param    dth    The delivery task helper for the queue this task is in.
94      * @param    pubid    The publish ID for this file.  This is used as
95      * the base for the file name in the spool directory and is of
96      * the form <milliseconds since 1970>.<fqdn of initial data router node>
97      */
98     public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
99         this.dth = dth;
100         this.pubid = pubid;
101         di = dth.getDestInfo();
102         subid = di.getSubId();
103         feedid = di.getLogData();
104         spool = di.getSpool();
105         String dfn = spool + "/" + pubid;
106         String mfn = dfn + ".M";
107         datafile = new File(spool + "/" + pubid);
108         metafile = new File(mfn);
109         boolean monly = di.isMetaDataOnly();
110         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
111         Vector<String[]> hdrv = new Vector<String[]>();
112         try {
113             BufferedReader br = new BufferedReader(new FileReader(metafile));
114             String s = br.readLine();
115             int i = s.indexOf('\t');
116             method = s.substring(0, i);
117             if (!"DELETE".equals(method) && !monly) {
118                 length = datafile.length();
119             }
120             fileid = s.substring(i + 1);
121             while ((s = br.readLine()) != null) {
122                 i = s.indexOf('\t');
123                 String h = s.substring(0, i);
124                 String v = s.substring(i + 1);
125                 if ("x-att-dr-routing".equalsIgnoreCase(h)) {
126                     subid = v.replaceAll("[^ ]*/", "");
127                     feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
128                 }
129                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
130                     continue;
131                 }
132                 if (h.equalsIgnoreCase("content-type")) {
133                     ctype = v;
134                 }
135                 hdrv.add(new String[]{h, v});
136             }
137             br.close();
138         } catch (Exception e) {
139         }
140         hdrs = hdrv.toArray(new String[hdrv.size()][]);
141         url = dth.getDestURL(fileid);
142     }
143
144     /**
145      * Get the publish ID
146      */
147     public String getPublishId() {
148         return (pubid);
149     }
150
151     /**
152      * Attempt delivery
153      */
154     public void run() {
155         attempts++;
156         try {
157             di = dth.getDestInfo();
158             boolean expect100 = di.isUsing100();
159             boolean monly = di.isMetaDataOnly();
160             length = 0;
161             if (!"DELETE".equals(method) && !monly) {
162                 length = datafile.length();
163             }
164             url = dth.getDestURL(fileid);
165             URL u = new URL(url);
166             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
167             uc.setConnectTimeout(60000);
168             uc.setReadTimeout(60000);
169             uc.setInstanceFollowRedirects(false);
170             uc.setRequestMethod(method);
171             uc.setRequestProperty("Content-Length", Long.toString(length));
172             uc.setRequestProperty("Authorization", di.getAuth());
173             uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
174             for (String[] nv : hdrs) {
175                 uc.addRequestProperty(nv[0], nv[1]);
176             }
177             if (length > 0) {
178                 if (expect100) {
179                     uc.setRequestProperty("Expect", "100-continue");
180                 }
181                 uc.setFixedLengthStreamingMode(length);
182                 uc.setDoOutput(true);
183                 OutputStream os = null;
184                 try {
185                     os = uc.getOutputStream();
186                 } catch (ProtocolException pe) {
187                     dth.reportDeliveryExtra(this, -1L);
188                     // Rcvd error instead of 100-continue
189                 }
190                 if (os != null) {
191                     long sofar = 0;
192                     try {
193                         byte[] buf = new byte[1024 * 1024];
194                         InputStream is = new FileInputStream(datafile);
195                         while (sofar < length) {
196                             int i = buf.length;
197                             if (sofar + i > length) {
198                                 i = (int) (length - sofar);
199                             }
200                             i = is.read(buf, 0, i);
201                             if (i <= 0) {
202                                 throw new IOException("Unexpected problem reading data file " + datafile);
203                             }
204                             sofar += i;
205                             os.write(buf, 0, i);
206                         }
207                         is.close();
208                         os.close();
209                     } catch (IOException ioe) {
210                         dth.reportDeliveryExtra(this, sofar);
211                         throw ioe;
212                     }
213                 }
214             }
215             int rc = uc.getResponseCode();
216             String rmsg = uc.getResponseMessage();
217             if (rmsg == null) {
218                 String h0 = uc.getHeaderField(0);
219                 if (h0 != null) {
220                     int i = h0.indexOf(' ');
221                     int j = h0.indexOf(' ', i + 1);
222                     if (i != -1 && j != -1) {
223                         rmsg = h0.substring(j + 1);
224                     }
225                 }
226             }
227             String xpubid = null;
228             InputStream is;
229             if (rc >= 200 && rc <= 299) {
230                 is = uc.getInputStream();
231                 xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
232             } else {
233                 if (rc >= 300 && rc <= 399) {
234                     rmsg = uc.getHeaderField("Location");
235                 }
236                 is = uc.getErrorStream();
237             }
238             byte[] buf = new byte[4096];
239             if (is != null) {
240                 while (is.read(buf) > 0) {
241                 }
242                 is.close();
243             }
244             dth.reportStatus(this, rc, xpubid, rmsg);
245         } catch (Exception e) {
246             dth.reportException(this, e);
247         }
248     }
249
250     /**
251      * Remove meta and data files
252      */
253     public void clean() {
254         datafile.delete();
255         metafile.delete();
256         hdrs = null;
257     }
258
259     /**
260      * Has this delivery task been cleaned?
261      */
262     public boolean isCleaned() {
263         return (hdrs == null);
264     }
265
266     /**
267      * Get length of body
268      */
269     public long getLength() {
270         return (length);
271     }
272
273     /**
274      * Get creation date as encoded in the publish ID.
275      */
276     public long getDate() {
277         return (date);
278     }
279
280     /**
281      * Get the most recent delivery attempt URL
282      */
283     public String getURL() {
284         return (url);
285     }
286
287     /**
288      * Get the content type
289      */
290     public String getCType() {
291         return (ctype);
292     }
293
294     /**
295      * Get the method
296      */
297     public String getMethod() {
298         return (method);
299     }
300
301     /**
302      * Get the file ID
303      */
304     public String getFileId() {
305         return (fileid);
306     }
307
308     /**
309      * Get the number of delivery attempts
310      */
311     public int getAttempts() {
312         return (attempts);
313     }
314
315     /**
316      * Get the (space delimited list of) subscription ID for this delivery task
317      */
318     public String getSubId() {
319         return (subid);
320     }
321
322     /**
323      * Get the feed ID for this delivery task
324      */
325     public String getFeedId() {
326         return (feedid);
327     }
328 }