Fixed Sonar Issues
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.io.*;
28 import java.net.*;
29 import java.util.*;
30
31 import org.apache.log4j.Logger;
32
33 /**
34  * A file to be delivered to a destination.
35  * <p>
36  * A Delivery task represents a work item for the data router - a file that
37  * needs to be delivered and provides mechanisms to get information about
38  * the file and its delivery data as well as to attempt delivery.
39  */
40 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
41     private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
42     private DeliveryTaskHelper dth;
43     private String pubid;
44     private DestInfo di;
45     private String spool;
46     private File datafile;
47     private File metafile;
48     private long length;
49     private long date;
50     private String method;
51     private String fileid;
52     private String ctype;
53     private String url;
54     private String feedid;
55     private String subid;
56     private int attempts;
57     private String[][] hdrs;
58
59
60     /**
61      * Create a delivery task for a given delivery queue and pub ID
62      *
63      * @param    dth    The delivery task helper for the queue this task is in.
64      * @param    pubid    The publish ID for this file.  This is used as
65      * the base for the file name in the spool directory and is of
66      * the form <milliseconds since 1970>.<fqdn of initial data router node>
67      */
68     public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
69         this.dth = dth;
70         this.pubid = pubid;
71         di = dth.getDestInfo();
72         subid = di.getSubId();
73         feedid = di.getLogData();
74         spool = di.getSpool();
75         String dfn = spool + "/" + pubid;
76         String mfn = dfn + ".M";
77         datafile = new File(spool + "/" + pubid);
78         metafile = new File(mfn);
79         boolean monly = di.isMetaDataOnly();
80         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
81         Vector<String[]> hdrv = new Vector<String[]>();
82         try {
83             try(BufferedReader br = new BufferedReader(new FileReader(metafile))){
84                 String s = br.readLine();
85                 int i = s.indexOf('\t');
86                 method = s.substring(0, i);
87                 if (!"DELETE".equals(method) && !monly) {
88                     length = datafile.length();
89                 }
90                 fileid = s.substring(i + 1);
91                 while ((s = br.readLine()) != null) {
92                     i = s.indexOf('\t');
93                     String h = s.substring(0, i);
94                     String v = s.substring(i + 1);
95                     if ("x-att-dr-routing".equalsIgnoreCase(h)) {
96                         subid = v.replaceAll("[^ ]*/", "");
97                         feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
98                     }
99                     if (length == 0 && h.toLowerCase().startsWith("content-")) {
100                         continue;
101                     }
102                     if (h.equalsIgnoreCase("content-type")) {
103                         ctype = v;
104                     }
105                     hdrv.add(new String[]{h, v});
106                 }
107             }
108
109         } catch (Exception e) {
110             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
111         }
112         hdrs = hdrv.toArray(new String[hdrv.size()][]);
113         url = dth.getDestURL(fileid);
114     }
115     /**
116      * Is the object a DeliveryTask with the same publication ID?
117      */
118     public boolean equals(Object o) {
119         if (!(o instanceof DeliveryTask)) {
120             return (false);
121         }
122         return (pubid.equals(((DeliveryTask) o).pubid));
123     }
124
125     /**
126      * Compare the publication IDs.
127      */
128     public int compareTo(DeliveryTask o) {
129         return (pubid.compareTo(o.pubid));
130     }
131
132     /**
133      * Get the hash code of the publication ID.
134      */
135     public int hashCode() {
136         return (pubid.hashCode());
137     }
138
139     /**
140      * Return the publication ID.
141      */
142     public String toString() {
143         return (pubid);
144     }
145     /**
146      * Get the publish ID
147      */
148     public String getPublishId() {
149         return (pubid);
150     }
151
152     /**
153      * Attempt delivery
154      */
155     public void run() {
156         attempts++;
157         try {
158             di = dth.getDestInfo();
159             boolean expect100 = di.isUsing100();
160             boolean monly = di.isMetaDataOnly();
161             length = 0;
162             if (!"DELETE".equals(method) && !monly) {
163                 length = datafile.length();
164             }
165             url = dth.getDestURL(fileid);
166             URL u = new URL(url);
167             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
168             uc.setConnectTimeout(60000);
169             uc.setReadTimeout(60000);
170             uc.setInstanceFollowRedirects(false);
171             uc.setRequestMethod(method);
172             uc.setRequestProperty("Content-Length", Long.toString(length));
173             uc.setRequestProperty("Authorization", di.getAuth());
174             uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
175             for (String[] nv : hdrs) {
176                 uc.addRequestProperty(nv[0], nv[1]);
177             }
178             if (length > 0) {
179                 if (expect100) {
180                     uc.setRequestProperty("Expect", "100-continue");
181                 }
182                 uc.setFixedLengthStreamingMode(length);
183                 uc.setDoOutput(true);
184                 OutputStream os = null;
185                 try {
186                     os = uc.getOutputStream();
187                 } catch (ProtocolException pe) {
188                     dth.reportDeliveryExtra(this, -1L);
189                     // Rcvd error instead of 100-continue
190                     loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
191                 }
192                 if (os != null) {
193                     long sofar = 0;
194                     try {
195                         byte[] buf = new byte[1024 * 1024];
196                         try(InputStream is = new FileInputStream(datafile)){
197                             while (sofar < length) {
198                                 int i = buf.length;
199                                 if (sofar + i > length) {
200                                     i = (int) (length - sofar);
201                                 }
202                                 i = is.read(buf, 0, i);
203                                 if (i <= 0) {
204                                     throw new IOException("Unexpected problem reading data file " + datafile);
205                                 }
206                                 sofar += i;
207                                 os.write(buf, 0, i);
208                             }
209                             is.close();
210                             os.close();
211                         }
212
213                     } catch (IOException ioe) {
214                         dth.reportDeliveryExtra(this, sofar);
215                         throw ioe;
216
217                     }
218                 }
219             }
220             int rc = uc.getResponseCode();
221             String rmsg = uc.getResponseMessage();
222             if (rmsg == null) {
223                 String h0 = uc.getHeaderField(0);
224                 if (h0 != null) {
225                     int i = h0.indexOf(' ');
226                     int j = h0.indexOf(' ', i + 1);
227                     if (i != -1 && j != -1) {
228                         rmsg = h0.substring(j + 1);
229                     }
230                 }
231             }
232             String xpubid = null;
233             InputStream is;
234             if (rc >= 200 && rc <= 299) {
235                 is = uc.getInputStream();
236                 xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
237             } else {
238                 if (rc >= 300 && rc <= 399) {
239                     rmsg = uc.getHeaderField("Location");
240                 }
241                 is = uc.getErrorStream();
242             }
243             byte[] buf = new byte[4096];
244             if (is != null) {
245                 while (is.read(buf) > 0) {
246                 }
247                 is.close();
248             }
249             dth.reportStatus(this, rc, xpubid, rmsg);
250         } catch (Exception e) {
251             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
252             dth.reportException(this, e);
253         }
254     }
255
256     /**
257      * Remove meta and data files
258      */
259     public void clean() {
260         datafile.delete();
261         metafile.delete();
262         hdrs = null;
263     }
264
265     /**
266      * Has this delivery task been cleaned?
267      */
268     public boolean isCleaned() {
269         return (hdrs == null);
270     }
271
272     /**
273      * Get length of body
274      */
275     public long getLength() {
276         return (length);
277     }
278
279     /**
280      * Get creation date as encoded in the publish ID.
281      */
282     public long getDate() {
283         return (date);
284     }
285
286     /**
287      * Get the most recent delivery attempt URL
288      */
289     public String getURL() {
290         return (url);
291     }
292
293     /**
294      * Get the content type
295      */
296     public String getCType() {
297         return (ctype);
298     }
299
300     /**
301      * Get the method
302      */
303     public String getMethod() {
304         return (method);
305     }
306
307     /**
308      * Get the file ID
309      */
310     public String getFileId() {
311         return (fileid);
312     }
313
314     /**
315      * Get the number of delivery attempts
316      */
317     public int getAttempts() {
318         return (attempts);
319     }
320
321     /**
322      * Get the (space delimited list of) subscription ID for this delivery task
323      */
324     public String getSubId() {
325         return (subid);
326     }
327
328     /**
329      * Get the feed ID for this delivery task
330      */
331     public String getFeedId() {
332         return (feedid);
333     }
334 }