Adding decompression option for user to a feed.
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.io.*;
28 import java.net.*;
29 import java.util.*;
30 import java.util.zip.GZIPInputStream;
31
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34 import org.apache.log4j.Logger;
35 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
36 import org.slf4j.MDC;
37
38 import static com.att.eelf.configuration.Configuration.*;
39 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
40
41 /**
42  * A file to be delivered to a destination.
43  * <p>
44  * A Delivery task represents a work item for the data router - a file that
45  * needs to be delivered and provides mechanisms to get information about
46  * the file and its delivery data as well as to attempt delivery.
47  */
48 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
49     private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
50     private static EELFLogger eelflogger = EELFManager.getInstance()
51             .getLogger(DeliveryTask.class);
52     private DeliveryTaskHelper deliveryTaskHelper;
53     private String pubid;
54     private DestInfo destInfo;
55     private String spool;
56     private File datafile;
57     private File metafile;
58     private long length;
59     private long date;
60     private String method;
61     private String fileid;
62     private String ctype;
63     private String url;
64     private String feedid;
65     private String subid;
66     private int attempts;
67     private String[][] hdrs;
68     private String newInvocationId;
69
70
71     /**
72      * Create a delivery task for a given delivery queue and pub ID
73      *
74      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
75      * @param pubid              The publish ID for this file.  This is used as
76      *                           the base for the file name in the spool directory and is of
77      *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
78      */
79     public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
80         this.deliveryTaskHelper = deliveryTaskHelper;
81         this.pubid = pubid;
82         destInfo = deliveryTaskHelper.getDestinationInfo();
83         subid = destInfo.getSubId();
84         feedid = destInfo.getLogData();
85         spool = destInfo.getSpool();
86         String dfn = spool + "/" + pubid;
87         String mfn = dfn + ".M";
88         datafile = new File(spool + "/" + pubid);
89         metafile = new File(mfn);
90         boolean monly = destInfo.isMetaDataOnly();
91         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
92         Vector<String[]> hdrv = new Vector<>();
93
94         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
95             String s = br.readLine();
96             int i = s.indexOf('\t');
97             method = s.substring(0, i);
98             NodeUtils.setIpAndFqdnForEelf(method);
99             if (!"DELETE".equals(method) && !monly) {
100                 length = datafile.length();
101             }
102             fileid = s.substring(i + 1);
103             while ((s = br.readLine()) != null) {
104                 i = s.indexOf('\t');
105                 String h = s.substring(0, i);
106                 String v = s.substring(i + 1);
107                 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
108                     subid = v.replaceAll("[^ ]*/", "");
109                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
110                 }
111                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
112                     continue;
113                 }
114                 if (h.equalsIgnoreCase("content-type")) {
115                     ctype = v;
116                 }
117                 if (h.equalsIgnoreCase("x-onap-requestid")) {
118                     MDC.put(MDC_KEY_REQUEST_ID, v);
119                 }
120                 if (h.equalsIgnoreCase("x-invocationid")) {
121                     MDC.put("InvocationId", v);
122                     v = UUID.randomUUID().toString();
123                     newInvocationId = v;
124                 }
125                 hdrv.add(new String[]{h, v});
126             }
127         } catch (Exception e) {
128             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
129         }
130         hdrs = hdrv.toArray(new String[hdrv.size()][]);
131         url = deliveryTaskHelper.getDestURL(fileid);
132     }
133
134     /**
135      * Is the object a DeliveryTask with the same publication ID?
136      */
137     public boolean equals(Object o) {
138         if (!(o instanceof DeliveryTask)) {
139             return (false);
140         }
141         return (pubid.equals(((DeliveryTask) o).pubid));
142     }
143
144     /**
145      * Compare the publication IDs.
146      */
147     public int compareTo(DeliveryTask o) {
148         return (pubid.compareTo(o.pubid));
149     }
150
151     /**
152      * Get the hash code of the publication ID.
153      */
154     public int hashCode() {
155         return (pubid.hashCode());
156     }
157
158     /**
159      * Return the publication ID.
160      */
161     public String toString() {
162         return (pubid);
163     }
164
165     /**
166      * Get the publish ID
167      */
168     public String getPublishId() {
169         return (pubid);
170     }
171
172     /**
173      * Attempt delivery
174      */
175     public void run() {
176         attempts++;
177         try {
178             destInfo = deliveryTaskHelper.getDestinationInfo();
179             boolean expect100 = destInfo.isUsing100();
180             boolean monly = destInfo.isMetaDataOnly();
181             length = 0;
182             if (!"DELETE".equals(method) && !monly) {
183                 length = datafile.length();
184             }
185             if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
186                     fileid = fileid.replace(".gz", "");
187             }
188             url = deliveryTaskHelper.getDestURL(fileid);
189             URL u = new URL(url);
190             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
191             uc.setConnectTimeout(60000);
192             uc.setReadTimeout(60000);
193             uc.setInstanceFollowRedirects(false);
194             uc.setRequestMethod(method);
195             uc.setRequestProperty("Content-Length", Long.toString(length));
196             uc.setRequestProperty("Authorization", destInfo.getAuth());
197             uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
198             for (String[] nv : hdrs) {
199                 uc.addRequestProperty(nv[0], nv[1]);
200             }
201             if (length > 0) {
202                 if (expect100) {
203                     uc.setRequestProperty("Expect", "100-continue");
204                 }
205                 uc.setDoOutput(true);
206                 if (destInfo.isDecompress()) {
207                     if (isFiletypeGzip(datafile)) {
208                         sendDecompressedFile(uc);
209                     } else {
210                         uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
211                         sendFile(uc);
212                     }
213                 } else {
214                     sendFile(uc);
215                 }
216             }
217             int rc = uc.getResponseCode();
218             String rmsg = uc.getResponseMessage();
219             if (rmsg == null) {
220                 String h0 = uc.getHeaderField(0);
221                 if (h0 != null) {
222                     int i = h0.indexOf(' ');
223                     int j = h0.indexOf(' ', i + 1);
224                     if (i != -1 && j != -1) {
225                         rmsg = h0.substring(j + 1);
226                     }
227                 }
228             }
229             String xpubid = null;
230             InputStream is;
231             if (rc >= 200 && rc <= 299) {
232                 is = uc.getInputStream();
233                 xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
234             } else {
235                 if (rc >= 300 && rc <= 399) {
236                     rmsg = uc.getHeaderField("Location");
237                 }
238                 is = uc.getErrorStream();
239             }
240             byte[] buf = new byte[4096];
241             if (is != null) {
242                 while (is.read(buf) > 0) {
243                 }
244                 is.close();
245             }
246             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
247         } catch (Exception e) {
248             loggerDeliveryTask.error("Exception " + e.getStackTrace(), e);
249             deliveryTaskHelper.reportException(this, e);
250         }
251     }
252
253     /**
254      * To send decompressed gzip to the subscribers
255      *
256      * @param httpURLConnection connection used to make request
257      * @throws IOException
258      */
259     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
260         byte[] buffer = new byte[8164];
261         httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
262         OutputStream outputStream = getOutputStream(httpURLConnection);
263         if (outputStream != null) {
264             int bytesRead = 0;
265             try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
266                 int bufferLength = buffer.length;
267                 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
268                     outputStream.write(buffer, 0, bytesRead);
269                 }
270                 outputStream.close();
271             } catch (IOException e) {
272                 httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
273                 loggerDeliveryTask.info("Could not decompress file");
274                 sendFile(httpURLConnection);
275             }
276
277         }
278     }
279
280     /**
281      * To send any file to the subscriber.
282      *
283      * @param httpURLConnection connection used to make request
284      * @throws IOException
285      */
286     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
287         OutputStream os = getOutputStream(httpURLConnection);
288         if (os != null) {
289             long sofar = 0;
290             try (InputStream is = new FileInputStream(datafile)) {
291                 byte[] buf = new byte[1024 * 1024];
292                 while (sofar < length) {
293                     int i = buf.length;
294                     if (sofar + i > length) {
295                         i = (int) (length - sofar);
296                     }
297                     i = is.read(buf, 0, i);
298                     if (i <= 0) {
299                         throw new IOException("Unexpected problem reading data file " + datafile);
300                     }
301                     sofar += i;
302                     os.write(buf, 0, i);
303                 }
304                 os.close();
305             } catch (IOException ioe) {
306                 deliveryTaskHelper.reportDeliveryExtra(this, sofar);
307                 throw ioe;
308             }
309         }
310     }
311
312     /**
313      * Get the outputstream that will be used to send data
314      *
315      * @param httpURLConnection connection used to make request
316      * @return AN Outpustream that can be used to send your data.
317      * @throws IOException
318      */
319     private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
320         OutputStream outputStream = null;
321
322         try {
323             outputStream = httpURLConnection.getOutputStream();
324         } catch (ProtocolException pe) {
325             deliveryTaskHelper.reportDeliveryExtra(this, -1L);
326             // Rcvd error instead of 100-continue
327             loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe);
328         }
329         return outputStream;
330     }
331
332     /**
333      * Remove meta and data files
334      */
335     public void clean() {
336         datafile.delete();
337         metafile.delete();
338         eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
339         eelflogger.info(EelfMsgs.EXIT);
340         hdrs = null;
341     }
342
343     /**
344      * Has this delivery task been cleaned?
345      */
346     public boolean isCleaned() {
347         return (hdrs == null);
348     }
349
350     /**
351      * Get length of body
352      */
353     public long getLength() {
354         return (length);
355     }
356
357     /**
358      * Get creation date as encoded in the publish ID.
359      */
360     public long getDate() {
361         return (date);
362     }
363
364     /**
365      * Get the most recent delivery attempt URL
366      */
367     public String getURL() {
368         return (url);
369     }
370
371     /**
372      * Get the content type
373      */
374     public String getCType() {
375         return (ctype);
376     }
377
378     /**
379      * Get the method
380      */
381     public String getMethod() {
382         return (method);
383     }
384
385     /**
386      * Get the file ID
387      */
388     public String getFileId() {
389         return (fileid);
390     }
391
392     /**
393      * Get the number of delivery attempts
394      */
395     public int getAttempts() {
396         return (attempts);
397     }
398
399     /**
400      * Get the (space delimited list of) subscription ID for this delivery task
401      */
402     public String getSubId() {
403         return (subid);
404     }
405
406     /**
407      * Get the feed ID for this delivery task
408      */
409     public String getFeedId() {
410         return (feedid);
411     }
412 }