cca61707898dac5dd45c37e50137d2dce134e749
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.io.*;
28 import java.net.*;
29 import java.util.*;
30 import java.util.zip.GZIPInputStream;
31
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
35 import org.slf4j.MDC;
36
37 import static com.att.eelf.configuration.Configuration.*;
38 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
39
40 /**
41  * A file to be delivered to a destination.
42  * <p>
43  * A Delivery task represents a work item for the data router - a file that
44  * needs to be delivered and provides mechanisms to get information about
45  * the file and its delivery data as well as to attempt delivery.
46  */
47 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
48     private static EELFLogger eelfLogger = EELFManager.getInstance()
49             .getLogger(DeliveryTask.class);
50     private DeliveryTaskHelper deliveryTaskHelper;
51     private String pubid;
52     private DestInfo destInfo;
53     private String spool;
54     private File datafile;
55     private File metafile;
56     private long length;
57     private long date;
58     private String method;
59     private String fileid;
60     private String ctype;
61     private String url;
62     private String feedid;
63     private String subid;
64     private int attempts;
65     private boolean followRedirects;
66     private String[][] hdrs;
67     private String newInvocationId;
68     private long resumeTime;
69
70
71     /**
72      * Create a delivery task for a given delivery queue and pub ID
73      *
74      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
75      * @param pubid              The publish ID for this file.  This is used as
76      *                           the base for the file name in the spool directory and is of
77      *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
78      */
79     DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
80         this.deliveryTaskHelper = deliveryTaskHelper;
81         this.pubid = pubid;
82         destInfo = deliveryTaskHelper.getDestinationInfo();
83         subid = destInfo.getSubId();
84         this.followRedirects = destInfo.isFollowRedirects();
85         feedid = destInfo.getLogData();
86         spool = destInfo.getSpool();
87         String dfn = spool + "/" + pubid;
88         String mfn = dfn + ".M";
89         datafile = new File(spool + "/" + pubid);
90         metafile = new File(mfn);
91         boolean monly = destInfo.isMetaDataOnly();
92         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
93         resumeTime = System.currentTimeMillis();
94         Vector<String[]> hdrv = new Vector<>();
95
96         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
97             String s = br.readLine();
98             int i = s.indexOf('\t');
99             method = s.substring(0, i);
100             NodeUtils.setIpAndFqdnForEelf(method);
101             if (!"DELETE".equals(method) && !monly) {
102                 length = datafile.length();
103             }
104             fileid = s.substring(i + 1);
105             while ((s = br.readLine()) != null) {
106                 i = s.indexOf('\t');
107                 String h = s.substring(0, i);
108                 String v = s.substring(i + 1);
109                 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
110                     subid = v.replaceAll("[^ ]*/", "");
111                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
112                 }
113                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
114                     continue;
115                 }
116                 if (h.equalsIgnoreCase("content-type")) {
117                     ctype = v;
118                 }
119                 if (h.equalsIgnoreCase("x-onap-requestid")) {
120                     MDC.put(MDC_KEY_REQUEST_ID, v);
121                 }
122                 if (h.equalsIgnoreCase("x-invocationid")) {
123                     MDC.put("InvocationId", v);
124                     v = UUID.randomUUID().toString();
125                     newInvocationId = v;
126                 }
127                 hdrv.add(new String[]{h, v});
128             }
129         } catch (Exception e) {
130             eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()), e.getMessage());
131         }
132         hdrs = hdrv.toArray(new String[hdrv.size()][]);
133         url = deliveryTaskHelper.getDestURL(fileid);
134     }
135
136     /**
137      * Is the object a DeliveryTask with the same publication ID?
138      */
139     public boolean equals(Object o) {
140         if (!(o instanceof DeliveryTask)) {
141             return (false);
142         }
143         return (pubid.equals(((DeliveryTask) o).pubid));
144     }
145
146     /**
147      * Compare the publication IDs.
148      */
149     public int compareTo(DeliveryTask o) {
150         return (pubid.compareTo(o.pubid));
151     }
152
153     /**
154      * Get the hash code of the publication ID.
155      */
156     public int hashCode() {
157         return (pubid.hashCode());
158     }
159
160     /**
161      * Return the publication ID.
162      */
163     public String toString() {
164         return (pubid);
165     }
166
167     /**
168      * Get the publish ID
169      */
170     String getPublishId() {
171         return (pubid);
172     }
173
174     /**
175      * Attempt delivery
176      */
177     public void run() {
178         attempts++;
179         try {
180             destInfo = deliveryTaskHelper.getDestinationInfo();
181             boolean expect100 = destInfo.isUsing100();
182             boolean monly = destInfo.isMetaDataOnly();
183             length = 0;
184             if (!"DELETE".equals(method) && !monly) {
185                 length = datafile.length();
186             }
187             if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
188                     fileid = fileid.replace(".gz", "");
189             }
190             url = deliveryTaskHelper.getDestURL(fileid);
191             URL u = new URL(url);
192             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
193             uc.setConnectTimeout(60000);
194             uc.setReadTimeout(60000);
195             uc.setInstanceFollowRedirects(false);
196             uc.setRequestMethod(method);
197             uc.setRequestProperty("Content-Length", Long.toString(length));
198             uc.setRequestProperty("Authorization", destInfo.getAuth());
199             uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
200             for (String[] nv : hdrs) {
201                 uc.addRequestProperty(nv[0], nv[1]);
202             }
203             if (length > 0) {
204                 if (expect100) {
205                     uc.setRequestProperty("Expect", "100-continue");
206                 }
207                 uc.setDoOutput(true);
208                 if (destInfo.isDecompress()) {
209                     if (isFiletypeGzip(datafile)) {
210                         sendDecompressedFile(uc);
211                     } else {
212                         uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
213                         sendFile(uc);
214                     }
215                 } else {
216                     sendFile(uc);
217                 }
218             }
219             int rc = uc.getResponseCode();
220             String rmsg = uc.getResponseMessage();
221             if (rmsg == null) {
222                 String h0 = uc.getHeaderField(0);
223                 if (h0 != null) {
224                     int i = h0.indexOf(' ');
225                     int j = h0.indexOf(' ', i + 1);
226                     if (i != -1 && j != -1) {
227                         rmsg = h0.substring(j + 1);
228                     }
229                 }
230             }
231             String xpubid = null;
232             InputStream is;
233             if (rc >= 200 && rc <= 299) {
234                 is = uc.getInputStream();
235                 xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
236             } else {
237                 if (rc >= 300 && rc <= 399) {
238                     rmsg = uc.getHeaderField("Location");
239                 }
240                 is = uc.getErrorStream();
241             }
242             byte[] buf = new byte[4096];
243             if (is != null) {
244                 while (is.read(buf) > 0) {
245                 }
246                 is.close();
247             }
248             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
249         } catch (Exception e) {
250             eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e);
251             deliveryTaskHelper.reportException(this, e);
252         }
253     }
254
255     /**
256      * To send decompressed gzip to the subscribers
257      *
258      * @param httpURLConnection connection used to make request
259      * @throws IOException
260      */
261     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
262         byte[] buffer = new byte[8164];
263         httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
264         OutputStream outputStream = getOutputStream(httpURLConnection);
265         if (outputStream != null) {
266             int bytesRead = 0;
267             try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
268                 int bufferLength = buffer.length;
269                 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
270                     outputStream.write(buffer, 0, bytesRead);
271                 }
272                 outputStream.close();
273             } catch (IOException e) {
274                 httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
275                 eelfLogger.info("Could not decompress file");
276                 sendFile(httpURLConnection);
277             }
278
279         }
280     }
281
282     /**
283      * To send any file to the subscriber.
284      *
285      * @param httpURLConnection connection used to make request
286      * @throws IOException
287      */
288     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
289         OutputStream os = getOutputStream(httpURLConnection);
290         if (os != null) {
291             long sofar = 0;
292             try (InputStream is = new FileInputStream(datafile)) {
293                 byte[] buf = new byte[1024 * 1024];
294                 while (sofar < length) {
295                     int i = buf.length;
296                     if (sofar + i > length) {
297                         i = (int) (length - sofar);
298                     }
299                     i = is.read(buf, 0, i);
300                     if (i <= 0) {
301                         throw new IOException("Unexpected problem reading data file " + datafile);
302                     }
303                     sofar += i;
304                     os.write(buf, 0, i);
305                 }
306                 os.close();
307             } catch (IOException ioe) {
308                 deliveryTaskHelper.reportDeliveryExtra(this, sofar);
309                 throw ioe;
310             }
311         }
312     }
313
314     /**
315      * Get the outputstream that will be used to send data
316      *
317      * @param httpURLConnection connection used to make request
318      * @return AN Outpustream that can be used to send your data.
319      * @throws IOException
320      */
321     private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
322         OutputStream outputStream = null;
323
324         try {
325             outputStream = httpURLConnection.getOutputStream();
326         } catch (ProtocolException pe) {
327             deliveryTaskHelper.reportDeliveryExtra(this, -1L);
328             // Rcvd error instead of 100-continue
329             eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
330         }
331         return outputStream;
332     }
333
334     /**
335      * Remove meta and data files
336      */
337     void clean() {
338         datafile.delete();
339         metafile.delete();
340         eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
341         eelfLogger.info(EelfMsgs.EXIT);
342         hdrs = null;
343     }
344
345     /**
346      * Set the resume time for a delivery task.
347      */
348     void setResumeTime(long resumeTime) {
349         this.resumeTime = resumeTime;
350     }
351
352     /**
353      * Get the resume time for a delivery task.
354      */
355     long getResumeTime() {
356         return resumeTime;
357     }
358
359     /**
360      * Has this delivery task been cleaned?
361      */
362     boolean isCleaned() {
363         return (hdrs == null);
364     }
365
366     /**
367      * Get length of body
368      */
369     public long getLength() {
370         return (length);
371     }
372
373     /**
374      * Get creation date as encoded in the publish ID.
375      */
376     long getDate() {
377         return (date);
378     }
379
380     /**
381      * Get the most recent delivery attempt URL
382      */
383     public String getURL() {
384         return (url);
385     }
386
387     /**
388      * Get the content type
389      */
390     String getCType() {
391         return (ctype);
392     }
393
394     /**
395      * Get the method
396      */
397     String getMethod() {
398         return (method);
399     }
400
401     /**
402      * Get the file ID
403      */
404     String getFileId() {
405         return (fileid);
406     }
407
408     /**
409      * Get the number of delivery attempts
410      */
411     int getAttempts() {
412         return (attempts);
413     }
414
415     /**
416      * Get the (space delimited list of) subscription ID for this delivery task
417      */
418     String getSubId() {
419         return (subid);
420     }
421
422     /**
423      * Get the feed ID for this delivery task
424      */
425     String getFeedId() {
426         return (feedid);
427     }
428
429     /**
430      * Get the followRedirects for this delivery task
431      */
432     public boolean getFollowRedirects() {
433         return(followRedirects);
434     }
435 }