193fa65e86c0ffb1de8091b2a62d4b4f6cfe32f7
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
28 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
29
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import java.io.BufferedReader;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.FileReader;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.net.HttpURLConnection;
40 import java.net.ProtocolException;
41 import java.net.URL;
42 import java.nio.file.Files;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.UUID;
46 import java.util.zip.GZIPInputStream;
47 import org.jetbrains.annotations.Nullable;
48 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
49 import org.slf4j.MDC;
50
51 /**
52  * A file to be delivered to a destination.
53  *
54  * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
55  * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
56  */
57 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
58
59     private static final String DECOMPRESSION_STATUS = "Decompression_Status";
60     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
61     private DeliveryTaskHelper deliveryTaskHelper;
62     private String pubid;
63     private DestInfo destInfo;
64     private String spool;
65     private File datafile;
66     private File metafile;
67     private long length;
68     private long date;
69     private String method;
70     private String fileid;
71     private String ctype;
72     private String url;
73     private String feedid;
74     private String subid;
75     private int attempts;
76     private boolean followRedirects;
77     private String[][] hdrs;
78     private String newInvocationId;
79     private long resumeTime;
80
81
82     /**
83      * Create a delivery task for a given delivery queue and pub ID.
84      *
85      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
86      * @param pubid The publish ID for this file.  This is used as the base for the file name in the spool directory and
87      *      is of the form (milliseconds since 1970).(fqdn of initial data router node)
88      */
89     DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
90         this.deliveryTaskHelper = deliveryTaskHelper;
91         this.pubid = pubid;
92         destInfo = deliveryTaskHelper.getDestinationInfo();
93         subid = destInfo.getSubId();
94         this.followRedirects = destInfo.isFollowRedirects();
95         feedid = destInfo.getLogData();
96         spool = destInfo.getSpool();
97         String dfn = spool + File.separator + pubid;
98         String mfn = dfn + ".M";
99         datafile = new File(spool + File.separator + pubid);
100         metafile = new File(mfn);
101         boolean monly = destInfo.isMetaDataOnly();
102         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
103         resumeTime = System.currentTimeMillis();
104         ArrayList<String[]> hdrv = new ArrayList<>();
105
106         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
107             String line = br.readLine();
108             int index = line.indexOf('\t');
109             method = line.substring(0, index);
110             NodeUtils.setIpAndFqdnForEelf(method);
111             if (!"DELETE".equals(method) && !monly) {
112                 length = datafile.length();
113             }
114             fileid = line.substring(index + 1);
115             while ((line = br.readLine()) != null) {
116                 index = line.indexOf('\t');
117                 String header = line.substring(0, index);
118                 String headerValue = line.substring(index + 1);
119                 if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
120                     subid = headerValue.replaceAll("[^ ]*/", "");
121                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
122                 }
123                 if (length == 0 && header.toLowerCase().startsWith("content-")) {
124                     continue;
125                 }
126                 if ("content-type".equalsIgnoreCase(header)) {
127                     ctype = headerValue;
128                 }
129                 if ("x-onap-requestid".equalsIgnoreCase(header)) {
130                     MDC.put(MDC_KEY_REQUEST_ID, headerValue);
131                 }
132                 if ("x-invocationid".equalsIgnoreCase(header)) {
133                     MDC.put("InvocationId", headerValue);
134                     headerValue = UUID.randomUUID().toString();
135                     newInvocationId = headerValue;
136                 }
137                 hdrv.add(new String[]{header, headerValue});
138             }
139         } catch (Exception e) {
140             eelfLogger.error("Exception", e);
141         }
142         hdrs = hdrv.toArray(new String[hdrv.size()][]);
143         url = deliveryTaskHelper.getDestURL(fileid);
144     }
145
146     /**
147      * Is the object a DeliveryTask with the same publication ID.
148      */
149     public boolean equals(Object object) {
150         if (!(object instanceof DeliveryTask)) {
151             return (false);
152         }
153         return (pubid.equals(((DeliveryTask) object).pubid));
154     }
155
156     /**
157      * Compare the publication IDs.
158      */
159     public int compareTo(DeliveryTask other) {
160         return (pubid.compareTo(other.pubid));
161     }
162
163     /**
164      * Get the hash code of the publication ID.
165      */
166     public int hashCode() {
167         return (pubid.hashCode());
168     }
169
170     /**
171      * Return the publication ID.
172      */
173     public String toString() {
174         return (pubid);
175     }
176
177     /**
178      * Get the publish ID.
179      */
180     String getPublishId() {
181         return (pubid);
182     }
183
184     /**
185      * Attempt delivery.
186      */
187     public void run() {
188         attempts++;
189         try {
190             destInfo = deliveryTaskHelper.getDestinationInfo();
191             boolean monly = destInfo.isMetaDataOnly();
192             length = 0;
193             if (!"DELETE".equals(method) && !monly) {
194                 length = datafile.length();
195             }
196             stripSuffixIfIsDecompress();
197             url = deliveryTaskHelper.getDestURL(fileid);
198             URL urlObj = new URL(url);
199             HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
200             urlConnection.setConnectTimeout(60000);
201             urlConnection.setReadTimeout(60000);
202             urlConnection.setInstanceFollowRedirects(false);
203             urlConnection.setRequestMethod(method);
204             urlConnection.setRequestProperty("Content-Length", Long.toString(length));
205             urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
206             urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
207             boolean expect100 = destInfo.isUsing100();
208             int rc = deliverFileToSubscriber(expect100, urlConnection);
209             String rmsg = urlConnection.getResponseMessage();
210             rmsg = getResponseMessage(urlConnection, rmsg);
211             String xpubid = null;
212             InputStream is;
213             if (rc >= 200 && rc <= 299) {
214                 is = urlConnection.getInputStream();
215                 xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
216             } else {
217                 if (rc >= 300 && rc <= 399) {
218                     rmsg = urlConnection.getHeaderField("Location");
219                 }
220                 is = urlConnection.getErrorStream();
221             }
222             byte[] buf = new byte[4096];
223             if (is != null) {
224                 while (is.read(buf) > 0) {
225                 }
226                 is.close();
227             }
228             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
229         } catch (Exception e) {
230             eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
231             deliveryTaskHelper.reportException(this, e);
232         }
233     }
234
235     /**
236      * To send decompressed gzip to the subscribers.
237      *
238      * @param httpURLConnection connection used to make request
239      */
240     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
241         byte[] buffer = new byte[8164];
242         httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
243         OutputStream outputStream = getOutputStream(httpURLConnection);
244         if (outputStream != null) {
245             int bytesRead;
246             try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
247                 int bufferLength = buffer.length;
248                 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
249                     outputStream.write(buffer, 0, bytesRead);
250                 }
251                 outputStream.close();
252             } catch (IOException e) {
253                 httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
254                 eelfLogger.info("Could not decompress file", e);
255                 sendFile(httpURLConnection);
256             }
257
258         }
259     }
260
261     /**
262      * To send any file to the subscriber.
263      *
264      * @param httpURLConnection connection used to make request
265      */
266     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
267         OutputStream os = getOutputStream(httpURLConnection);
268         if (os == null) {
269             return;
270         }
271         long sofar = 0;
272         try (InputStream is = new FileInputStream(datafile)) {
273             byte[] buf = new byte[1024 * 1024];
274             while (sofar < length) {
275                 int len = buf.length;
276                 if (sofar + len > length) {
277                     len = (int) (length - sofar);
278                 }
279                 len = is.read(buf, 0, len);
280                 if (len <= 0) {
281                     throw new IOException("Unexpected problem reading data file " + datafile);
282                 }
283                 sofar += len;
284                 os.write(buf, 0, len);
285             }
286             os.close();
287         } catch (IOException ioe) {
288             deliveryTaskHelper.reportDeliveryExtra(this, sofar);
289             throw ioe;
290         }
291     }
292
293     /**
294      * Get the outputstream that will be used to send data.
295      *
296      * @param httpURLConnection connection used to make request
297      * @return AN Outpustream that can be used to send your data.
298      */
299     OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
300         OutputStream outputStream = null;
301         try {
302             outputStream = httpURLConnection.getOutputStream();
303         } catch (ProtocolException pe) {
304             deliveryTaskHelper.reportDeliveryExtra(this, -1L);
305             // Rcvd error instead of 100-continue
306             eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
307         }
308         return outputStream;
309     }
310
311     private void stripSuffixIfIsDecompress() {
312         if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
313             fileid = fileid.replace(".gz", "");
314         }
315     }
316
317     private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
318         for (String[] nv : hdrs) {
319             uc.addRequestProperty(nv[0], nv[1]);
320         }
321         if (length > 0) {
322             if (expect100) {
323                 uc.setRequestProperty("Expect", "100-continue");
324             }
325             uc.setDoOutput(true);
326             if (destInfo.isDecompress()) {
327                 if (isFiletypeGzip(datafile)) {
328                     sendDecompressedFile(uc);
329                 } else {
330                     uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
331                     sendFile(uc);
332                 }
333             } else {
334                 sendFile(uc);
335             }
336         }
337         return uc.getResponseCode();
338     }
339
340     @Nullable
341     private String getResponseMessage(HttpURLConnection uc, String rmsg) {
342         if (rmsg == null) {
343             String h0 = uc.getHeaderField(0);
344             if (h0 != null) {
345                 int indexOfSpace1 = h0.indexOf(' ');
346                 int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
347                 if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
348                     rmsg = h0.substring(indexOfSpace2 + 1);
349                 }
350             }
351         }
352         return rmsg;
353     }
354
355     /**
356      * Remove meta and data files.
357      */
358     void clean() {
359         deleteWithRetry(datafile);
360         deleteWithRetry(metafile);
361         eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
362         eelfLogger.info(EelfMsgs.EXIT);
363         hdrs = null;
364     }
365
366     private void deleteWithRetry(File file) {
367         int maxTries = 3;
368         int tryCount = 1;
369         while (tryCount <= maxTries) {
370             try {
371                 Files.deleteIfExists(file.toPath());
372                 break;
373             } catch (IOException e) {
374                 eelfLogger.error("IOException : Failed to delete file :"
375                                          + file.getName() + " on attempt " + tryCount, e);
376             }
377             tryCount++;
378         }
379     }
380
381     /**
382      * Get the resume time for a delivery task.
383      */
384     long getResumeTime() {
385         return resumeTime;
386     }
387
388     /**
389      * Set the resume time for a delivery task.
390      */
391     void setResumeTime(long resumeTime) {
392         this.resumeTime = resumeTime;
393     }
394
395     /**
396      * Has this delivery task been cleaned.
397      */
398     boolean isCleaned() {
399         return (hdrs == null);
400     }
401
402     /**
403      * Get length of body.
404      */
405     public long getLength() {
406         return (length);
407     }
408
409     /**
410      * Get creation date as encoded in the publish ID.
411      */
412     long getDate() {
413         return (date);
414     }
415
416     /**
417      * Get the most recent delivery attempt URL.
418      */
419     public String getURL() {
420         return (url);
421     }
422
423     /**
424      * Get the content type.
425      */
426     String getCType() {
427         return (ctype);
428     }
429
430     /**
431      * Get the method.
432      */
433     String getMethod() {
434         return (method);
435     }
436
437     /**
438      * Get the file ID.
439      */
440     String getFileId() {
441         return (fileid);
442     }
443
444     /**
445      * Get the number of delivery attempts.
446      */
447     int getAttempts() {
448         return (attempts);
449     }
450
451     /**
452      * Get the (space delimited list of) subscription ID for this delivery task.
453      */
454     String getSubId() {
455         return (subid);
456     }
457
458     /**
459      * Get the feed ID for this delivery task.
460      */
461     String getFeedId() {
462         return (feedid);
463     }
464
465     /**
466      * Get the followRedirects for this delivery task.
467      */
468     boolean getFollowRedirects() {
469         return (followRedirects);
470     }
471 }