Merge "DR AAF CADI integration"
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.io.*;
28 import java.net.*;
29 import java.util.*;
30 import java.util.zip.GZIPInputStream;
31
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34 import org.apache.log4j.Logger;
35 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
36 import org.slf4j.MDC;
37
38 import static com.att.eelf.configuration.Configuration.*;
39 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
40
41 /**
42  * A file to be delivered to a destination.
43  * <p>
44  * A Delivery task represents a work item for the data router - a file that
45  * needs to be delivered and provides mechanisms to get information about
46  * the file and its delivery data as well as to attempt delivery.
47  */
48 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
49     private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
50     private static EELFLogger eelflogger = EELFManager.getInstance()
51             .getLogger(DeliveryTask.class);
52     private DeliveryTaskHelper deliveryTaskHelper;
53     private String pubid;
54     private DestInfo destInfo;
55     private String spool;
56     private File datafile;
57     private File metafile;
58     private long length;
59     private long date;
60     private String method;
61     private String fileid;
62     private String ctype;
63     private String url;
64     private String feedid;
65     private String subid;
66     private int attempts;
67     private boolean followRedirects;
68     private String[][] hdrs;
69     private String newInvocationId;
70     private long resumeTime;
71
72
73     /**
74      * Create a delivery task for a given delivery queue and pub ID
75      *
76      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
77      * @param pubid              The publish ID for this file.  This is used as
78      *                           the base for the file name in the spool directory and is of
79      *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
80      */
81     DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
82         this.deliveryTaskHelper = deliveryTaskHelper;
83         this.pubid = pubid;
84         destInfo = deliveryTaskHelper.getDestinationInfo();
85         subid = destInfo.getSubId();
86         this.followRedirects = destInfo.isFollowRedirects();
87         feedid = destInfo.getLogData();
88         spool = destInfo.getSpool();
89         String dfn = spool + "/" + pubid;
90         String mfn = dfn + ".M";
91         datafile = new File(spool + "/" + pubid);
92         metafile = new File(mfn);
93         boolean monly = destInfo.isMetaDataOnly();
94         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
95         resumeTime = System.currentTimeMillis();
96         Vector<String[]> hdrv = new Vector<>();
97
98         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
99             String s = br.readLine();
100             int i = s.indexOf('\t');
101             method = s.substring(0, i);
102             NodeUtils.setIpAndFqdnForEelf(method);
103             if (!"DELETE".equals(method) && !monly) {
104                 length = datafile.length();
105             }
106             fileid = s.substring(i + 1);
107             while ((s = br.readLine()) != null) {
108                 i = s.indexOf('\t');
109                 String h = s.substring(0, i);
110                 String v = s.substring(i + 1);
111                 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
112                     subid = v.replaceAll("[^ ]*/", "");
113                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
114                 }
115                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
116                     continue;
117                 }
118                 if (h.equalsIgnoreCase("content-type")) {
119                     ctype = v;
120                 }
121                 if (h.equalsIgnoreCase("x-onap-requestid")) {
122                     MDC.put(MDC_KEY_REQUEST_ID, v);
123                 }
124                 if (h.equalsIgnoreCase("x-invocationid")) {
125                     MDC.put("InvocationId", v);
126                     v = UUID.randomUUID().toString();
127                     newInvocationId = v;
128                 }
129                 hdrv.add(new String[]{h, v});
130             }
131         } catch (Exception e) {
132             loggerDeliveryTask.error("Exception "+ Arrays.toString(e.getStackTrace()), e);
133         }
134         hdrs = hdrv.toArray(new String[hdrv.size()][]);
135         url = deliveryTaskHelper.getDestURL(fileid);
136     }
137
138     /**
139      * Is the object a DeliveryTask with the same publication ID?
140      */
141     public boolean equals(Object o) {
142         if (!(o instanceof DeliveryTask)) {
143             return (false);
144         }
145         return (pubid.equals(((DeliveryTask) o).pubid));
146     }
147
148     /**
149      * Compare the publication IDs.
150      */
151     public int compareTo(DeliveryTask o) {
152         return (pubid.compareTo(o.pubid));
153     }
154
155     /**
156      * Get the hash code of the publication ID.
157      */
158     public int hashCode() {
159         return (pubid.hashCode());
160     }
161
162     /**
163      * Return the publication ID.
164      */
165     public String toString() {
166         return (pubid);
167     }
168
169     /**
170      * Get the publish ID
171      */
172     String getPublishId() {
173         return (pubid);
174     }
175
176     /**
177      * Attempt delivery
178      */
179     public void run() {
180         attempts++;
181         try {
182             destInfo = deliveryTaskHelper.getDestinationInfo();
183             boolean expect100 = destInfo.isUsing100();
184             boolean monly = destInfo.isMetaDataOnly();
185             length = 0;
186             if (!"DELETE".equals(method) && !monly) {
187                 length = datafile.length();
188             }
189             if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
190                     fileid = fileid.replace(".gz", "");
191             }
192             url = deliveryTaskHelper.getDestURL(fileid);
193             URL u = new URL(url);
194             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
195             uc.setConnectTimeout(60000);
196             uc.setReadTimeout(60000);
197             uc.setInstanceFollowRedirects(false);
198             uc.setRequestMethod(method);
199             uc.setRequestProperty("Content-Length", Long.toString(length));
200             uc.setRequestProperty("Authorization", destInfo.getAuth());
201             uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
202             for (String[] nv : hdrs) {
203                 uc.addRequestProperty(nv[0], nv[1]);
204             }
205             if (length > 0) {
206                 if (expect100) {
207                     uc.setRequestProperty("Expect", "100-continue");
208                 }
209                 uc.setDoOutput(true);
210                 if (destInfo.isDecompress()) {
211                     if (isFiletypeGzip(datafile)) {
212                         sendDecompressedFile(uc);
213                     } else {
214                         uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
215                         sendFile(uc);
216                     }
217                 } else {
218                     sendFile(uc);
219                 }
220             }
221             int rc = uc.getResponseCode();
222             String rmsg = uc.getResponseMessage();
223             if (rmsg == null) {
224                 String h0 = uc.getHeaderField(0);
225                 if (h0 != null) {
226                     int i = h0.indexOf(' ');
227                     int j = h0.indexOf(' ', i + 1);
228                     if (i != -1 && j != -1) {
229                         rmsg = h0.substring(j + 1);
230                     }
231                 }
232             }
233             String xpubid = null;
234             InputStream is;
235             if (rc >= 200 && rc <= 299) {
236                 is = uc.getInputStream();
237                 xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
238             } else {
239                 if (rc >= 300 && rc <= 399) {
240                     rmsg = uc.getHeaderField("Location");
241                 }
242                 is = uc.getErrorStream();
243             }
244             byte[] buf = new byte[4096];
245             if (is != null) {
246                 while (is.read(buf) > 0) {
247                 }
248                 is.close();
249             }
250             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
251         } catch (Exception e) {
252             loggerDeliveryTask.error("Exception " + Arrays.toString(e.getStackTrace()), e);
253             deliveryTaskHelper.reportException(this, e);
254         }
255     }
256
257     /**
258      * To send decompressed gzip to the subscribers
259      *
260      * @param httpURLConnection connection used to make request
261      * @throws IOException
262      */
263     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
264         byte[] buffer = new byte[8164];
265         httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
266         OutputStream outputStream = getOutputStream(httpURLConnection);
267         if (outputStream != null) {
268             int bytesRead = 0;
269             try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
270                 int bufferLength = buffer.length;
271                 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
272                     outputStream.write(buffer, 0, bytesRead);
273                 }
274                 outputStream.close();
275             } catch (IOException e) {
276                 httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
277                 loggerDeliveryTask.info("Could not decompress file");
278                 sendFile(httpURLConnection);
279             }
280
281         }
282     }
283
284     /**
285      * To send any file to the subscriber.
286      *
287      * @param httpURLConnection connection used to make request
288      * @throws IOException
289      */
290     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
291         OutputStream os = getOutputStream(httpURLConnection);
292         if (os != null) {
293             long sofar = 0;
294             try (InputStream is = new FileInputStream(datafile)) {
295                 byte[] buf = new byte[1024 * 1024];
296                 while (sofar < length) {
297                     int i = buf.length;
298                     if (sofar + i > length) {
299                         i = (int) (length - sofar);
300                     }
301                     i = is.read(buf, 0, i);
302                     if (i <= 0) {
303                         throw new IOException("Unexpected problem reading data file " + datafile);
304                     }
305                     sofar += i;
306                     os.write(buf, 0, i);
307                 }
308                 os.close();
309             } catch (IOException ioe) {
310                 deliveryTaskHelper.reportDeliveryExtra(this, sofar);
311                 throw ioe;
312             }
313         }
314     }
315
316     /**
317      * Get the outputstream that will be used to send data
318      *
319      * @param httpURLConnection connection used to make request
320      * @return AN Outpustream that can be used to send your data.
321      * @throws IOException
322      */
323     private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
324         OutputStream outputStream = null;
325
326         try {
327             outputStream = httpURLConnection.getOutputStream();
328         } catch (ProtocolException pe) {
329             deliveryTaskHelper.reportDeliveryExtra(this, -1L);
330             // Rcvd error instead of 100-continue
331             loggerDeliveryTask.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
332         }
333         return outputStream;
334     }
335
336     /**
337      * Remove meta and data files
338      */
339     void clean() {
340         datafile.delete();
341         metafile.delete();
342         eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
343         eelflogger.info(EelfMsgs.EXIT);
344         hdrs = null;
345     }
346
347     /**
348      * Set the resume time for a delivery task.
349      */
350     void setResumeTime(long resumeTime) {
351         this.resumeTime = resumeTime;
352     }
353
354     /**
355      * Get the resume time for a delivery task.
356      */
357     long getResumeTime() {
358         return resumeTime;
359     }
360
361     /**
362      * Has this delivery task been cleaned?
363      */
364     boolean isCleaned() {
365         return (hdrs == null);
366     }
367
368     /**
369      * Get length of body
370      */
371     public long getLength() {
372         return (length);
373     }
374
375     /**
376      * Get creation date as encoded in the publish ID.
377      */
378     long getDate() {
379         return (date);
380     }
381
382     /**
383      * Get the most recent delivery attempt URL
384      */
385     public String getURL() {
386         return (url);
387     }
388
389     /**
390      * Get the content type
391      */
392     String getCType() {
393         return (ctype);
394     }
395
396     /**
397      * Get the method
398      */
399     String getMethod() {
400         return (method);
401     }
402
403     /**
404      * Get the file ID
405      */
406     String getFileId() {
407         return (fileid);
408     }
409
410     /**
411      * Get the number of delivery attempts
412      */
413     int getAttempts() {
414         return (attempts);
415     }
416
417     /**
418      * Get the (space delimited list of) subscription ID for this delivery task
419      */
420     String getSubId() {
421         return (subid);
422     }
423
424     /**
425      * Get the feed ID for this delivery task
426      */
427     String getFeedId() {
428         return (feedid);
429     }
430
431     /**
432      * Get the followRedirects for this delivery task
433      */
434     public boolean getFollowRedirects() {
435         return(followRedirects);
436     }
437 }