[DMAAP-48] Initial code import
[dmaap/datarouter.git] / datarouter-node / src / main / java / com / att / research / datarouter / node / DeliveryTask.java
1 /*******************************************************************************\r
2  * ============LICENSE_START==================================================\r
3  * * org.onap.dmaap\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 \r
24 \r
25 package com.att.research.datarouter.node;\r
26 \r
27 import java.io.*;\r
28 import java.net.*;\r
29 import java.util.*;\r
30 import org.apache.log4j.Logger;\r
31 \r
32 /**\r
33  *      A file to be delivered to a destination.\r
34  *      <p>\r
35  *      A Delivery task represents a work item for the data router - a file that\r
36  *      needs to be delivered and provides mechanisms to get information about\r
37  *      the file and its delivery data as well as to attempt delivery.\r
38  */\r
39 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {\r
40         private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.DeliveryTask");\r
41         private DeliveryTaskHelper      dth;\r
42         private String  pubid;\r
43         private DestInfo        di;\r
44         private String  spool;\r
45         private File    datafile;\r
46         private File    metafile;\r
47         private long    length;\r
48         private long    date;\r
49         private String  method;\r
50         private String  fileid;\r
51         private String  ctype;\r
52         private String  url;\r
53         private String  feedid;\r
54         private String  subid;\r
55         private int     attempts;\r
56         private String[][]      hdrs;\r
57         /**\r
58          *      Is the object a DeliveryTask with the same publication ID?\r
59          */\r
60         public boolean equals(Object o) {\r
61                 if (!(o instanceof DeliveryTask)) {\r
62                         return(false);\r
63                 }\r
64                 return(pubid.equals(((DeliveryTask)o).pubid));\r
65         }\r
66         /**\r
67          *      Compare the publication IDs.\r
68          */\r
69         public int compareTo(DeliveryTask o) {\r
70                 return(pubid.compareTo(o.pubid));\r
71         }\r
72         /**\r
73          *      Get the hash code of the publication ID.\r
74          */\r
75         public int hashCode() {\r
76                 return(pubid.hashCode());\r
77         }\r
78         /**\r
79          *      Return the publication ID.\r
80          */\r
81         public String toString() {\r
82                 return(pubid);\r
83         }\r
84         /**\r
85          *      Create a delivery task for a given delivery queue and pub ID\r
86          *      @param  dth     The delivery task helper for the queue this task is in.\r
87          *      @param  pubid   The publish ID for this file.  This is used as\r
88          *      the base for the file name in the spool directory and is of\r
89          *      the form <milliseconds since 1970>.<fqdn of initial data router node>\r
90          */\r
91         public DeliveryTask(DeliveryTaskHelper dth, String pubid) {\r
92                 this.dth = dth;\r
93                 this.pubid = pubid;\r
94                 di = dth.getDestInfo();\r
95                 subid = di.getSubId();\r
96                 feedid = di.getLogData();\r
97                 spool = di.getSpool();\r
98                 String dfn = spool + "/" + pubid;\r
99                 String mfn = dfn + ".M";\r
100                 datafile = new File(spool + "/" + pubid);\r
101                 metafile = new File(mfn);\r
102                 boolean monly = di.isMetaDataOnly();\r
103                 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));\r
104                 Vector<String[]> hdrv = new Vector<String[]>();\r
105                 try {\r
106                         BufferedReader br = new BufferedReader(new FileReader(metafile));\r
107                         String s = br.readLine();\r
108                         int i = s.indexOf('\t');\r
109                         method = s.substring(0, i);\r
110                         if (!"DELETE".equals(method) && !monly) {\r
111                                 length = datafile.length();\r
112                         }\r
113                         fileid = s.substring(i + 1);\r
114                         while ((s = br.readLine()) != null) {\r
115                                 i = s.indexOf('\t');\r
116                                 String h = s.substring(0, i);\r
117                                 String v = s.substring(i + 1);\r
118                                 if ("x-att-dr-routing".equalsIgnoreCase(h)) {\r
119                                         subid = v.replaceAll("[^ ]*/", "");\r
120                                         feedid = dth.getFeedId(subid.replaceAll(" .*", ""));\r
121                                 }\r
122                                 if (length == 0 && h.toLowerCase().startsWith("content-")) {\r
123                                         continue;\r
124                                 }\r
125                                 if (h.equalsIgnoreCase("content-type")) {\r
126                                         ctype = v;\r
127                                 }\r
128                                 hdrv.add(new String[] {h, v});\r
129                         }\r
130                         br.close();\r
131                 } catch (Exception e) {\r
132                 }\r
133                 hdrs = hdrv.toArray(new String[hdrv.size()][]);\r
134                 url = dth.getDestURL(fileid);\r
135         }\r
136         /**\r
137          *      Get the publish ID\r
138          */\r
139         public String getPublishId() {\r
140                 return(pubid);\r
141         }\r
142         /**\r
143          *      Attempt delivery\r
144          */\r
145         public void run() {\r
146                 attempts++;\r
147                 try {\r
148                         di = dth.getDestInfo();\r
149                         boolean expect100 = di.isUsing100();\r
150                         boolean monly = di.isMetaDataOnly();\r
151                         length = 0;\r
152                         if (!"DELETE".equals(method) && !monly) {\r
153                                 length = datafile.length();\r
154                         }\r
155                         url = dth.getDestURL(fileid);\r
156                         URL u = new URL(url);\r
157                         HttpURLConnection uc = (HttpURLConnection)u.openConnection();\r
158                         uc.setConnectTimeout(60000);\r
159                         uc.setReadTimeout(60000);\r
160                         uc.setInstanceFollowRedirects(false);\r
161                         uc.setRequestMethod(method);\r
162                         uc.setRequestProperty("Content-Length", Long.toString(length));\r
163                         uc.setRequestProperty("Authorization", di.getAuth());\r
164                         uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);\r
165                         for (String[] nv: hdrs) {\r
166                                 uc.addRequestProperty(nv[0], nv[1]);\r
167                         }\r
168                         if (length > 0) {\r
169                                 if (expect100) {\r
170                                         uc.setRequestProperty("Expect", "100-continue");\r
171                                 }\r
172                                 uc.setFixedLengthStreamingMode(length);\r
173                                 uc.setDoOutput(true);\r
174                                 OutputStream os = null;\r
175                                 try {\r
176                                         os = uc.getOutputStream();\r
177                                 } catch (ProtocolException pe) {\r
178                                         dth.reportDeliveryExtra(this, -1L);\r
179                                         // Rcvd error instead of 100-continue\r
180                                 }\r
181                                 if (os != null) {\r
182                                         long sofar = 0;\r
183                                         try {\r
184                                                 byte[] buf = new byte[1024 * 1024];\r
185                                                 InputStream is = new FileInputStream(datafile);\r
186                                                 while (sofar < length) {\r
187                                                         int i = buf.length;\r
188                                                         if (sofar + i > length) {\r
189                                                                 i = (int)(length - sofar);\r
190                                                         }\r
191                                                         i = is.read(buf, 0, i);\r
192                                                         if (i <= 0) {\r
193                                                                 throw new IOException("Unexpected problem reading data file " + datafile);\r
194                                                         }\r
195                                                         sofar += i;\r
196                                                         os.write(buf, 0, i);\r
197                                                 }\r
198                                                 is.close();\r
199                                                 os.close();\r
200                                         } catch (IOException ioe) {\r
201                                                 dth.reportDeliveryExtra(this, sofar);\r
202                                                 throw ioe;\r
203                                         }\r
204                                 }\r
205                         }\r
206                         int rc = uc.getResponseCode();\r
207                         String rmsg = uc.getResponseMessage();\r
208                         if (rmsg == null) {\r
209                                 String h0 = uc.getHeaderField(0);\r
210                                 if (h0 != null) {\r
211                                         int i = h0.indexOf(' ');\r
212                                         int j = h0.indexOf(' ', i + 1);\r
213                                         if (i != -1 && j != -1) {\r
214                                                 rmsg = h0.substring(j + 1);\r
215                                         }\r
216                                 }\r
217                         }\r
218                         String xpubid = null;\r
219                         InputStream is;\r
220                         if (rc >= 200 && rc <= 299) {\r
221                                 is = uc.getInputStream();\r
222                                 xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");\r
223                         } else {\r
224                                 if (rc >= 300 && rc <= 399) {\r
225                                         rmsg = uc.getHeaderField("Location");\r
226                                 }\r
227                                 is = uc.getErrorStream();\r
228                         }\r
229                         byte[] buf = new byte[4096];\r
230                         if (is != null) {\r
231                                 while (is.read(buf) > 0) {\r
232                                 }\r
233                                 is.close();\r
234                         }\r
235                         dth.reportStatus(this, rc, xpubid, rmsg);\r
236                 } catch (Exception e) {\r
237                         dth.reportException(this, e);\r
238                 }\r
239         }\r
240         /**\r
241          *      Remove meta and data files\r
242          */\r
243         public void clean() {\r
244                 datafile.delete();\r
245                 metafile.delete();\r
246                 hdrs = null;\r
247         }\r
248         /**\r
249          *      Has this delivery task been cleaned?\r
250          */\r
251         public boolean isCleaned() {\r
252                 return(hdrs == null);\r
253         }\r
254         /**\r
255          *      Get length of body\r
256          */\r
257         public long     getLength() {\r
258                 return(length);\r
259         }\r
260         /**\r
261          *      Get creation date as encoded in the publish ID.\r
262          */\r
263         public long     getDate() {\r
264                 return(date);\r
265         }\r
266         /**\r
267          *      Get the most recent delivery attempt URL\r
268          */\r
269         public String getURL() {\r
270                 return(url);\r
271         }\r
272         /**\r
273          *      Get the content type\r
274          */\r
275         public String   getCType() {\r
276                 return(ctype);\r
277         }\r
278         /**\r
279          *      Get the method\r
280          */\r
281         public String   getMethod() {\r
282                 return(method);\r
283         }\r
284         /**\r
285          *      Get the file ID\r
286          */\r
287         public String   getFileId() {\r
288                 return(fileid);\r
289         }\r
290         /**\r
291          *      Get the number of delivery attempts\r
292          */\r
293         public int      getAttempts() {\r
294                 return(attempts);\r
295         }\r
296         /**\r
297          *      Get the (space delimited list of) subscription ID for this delivery task\r
298          */\r
299         public String   getSubId() {\r
300                 return(subid);\r
301         }\r
302         /**\r
303          *      Get the feed ID for this delivery task\r
304          */\r
305         public String   getFeedId() {\r
306                 return(feedid);\r
307         }\r
308 }\r