Removing code smells
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
28 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
29
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import java.io.BufferedReader;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.FileReader;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.net.HttpURLConnection;
40 import java.net.ProtocolException;
41 import java.net.URL;
42 import java.nio.file.Files;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.UUID;
46 import java.util.zip.GZIPInputStream;
47 import org.jetbrains.annotations.Nullable;
48 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
49 import org.slf4j.MDC;
50
51 /**
52  * A file to be delivered to a destination.
53  *
54  * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
55  * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
56  */
57 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
58
59     private static final String DECOMPRESSION_STATUS = "Decompression_Status";
60     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
61     private DeliveryTaskHelper deliveryTaskHelper;
62     private String pubid;
63     private DestInfo destInfo;
64     private String spool;
65     private File datafile;
66     private File metafile;
67     private long length;
68     private long date;
69     private String method;
70     private String fileid;
71     private String ctype;
72     private String url;
73     private String feedid;
74     private String subid;
75     private int attempts;
76     private boolean followRedirects;
77     private String[][] hdrs;
78     private String newInvocationId;
79     private long resumeTime;
80
81
82     /**
83      * Create a delivery task for a given delivery queue and pub ID.
84      *
85      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
86      * @param pubid The publish ID for this file.  This is used as the base for the file name in the spool directory and
87      *      is of the form (milliseconds since 1970).(fqdn of initial data router node)
88      */
89     DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
90         this.deliveryTaskHelper = deliveryTaskHelper;
91         this.pubid = pubid;
92         destInfo = deliveryTaskHelper.getDestinationInfo();
93         subid = destInfo.getSubId();
94         this.followRedirects = destInfo.isFollowRedirects();
95         feedid = destInfo.getLogData();
96         spool = destInfo.getSpool();
97         String dfn = spool + File.separator + pubid;
98         String mfn = dfn + ".M";
99         datafile = new File(spool + File.separator + pubid);
100         metafile = new File(mfn);
101         boolean monly = destInfo.isMetaDataOnly();
102         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
103         resumeTime = System.currentTimeMillis();
104         ArrayList<String[]> hdrv = new ArrayList<>();
105
106         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
107             String line = br.readLine();
108             int index = line.indexOf('\t');
109             method = line.substring(0, index);
110             NodeUtils.setIpAndFqdnForEelf(method);
111             if (!"DELETE".equals(method) && !monly) {
112                 length = datafile.length();
113             }
114             fileid = line.substring(index + 1);
115             while ((line = br.readLine()) != null) {
116                 index = line.indexOf('\t');
117                 String header = line.substring(0, index);
118                 String headerValue = line.substring(index + 1);
119                 if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
120                     subid = headerValue.replaceAll("[^ ]*/", "");
121                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
122                 }
123                 if (length == 0 && header.toLowerCase().startsWith("content-")) {
124                     continue;
125                 }
126                 if ("content-type".equalsIgnoreCase(header)) {
127                     ctype = headerValue;
128                 }
129                 if ("x-onap-requestid".equalsIgnoreCase(header)) {
130                     MDC.put(MDC_KEY_REQUEST_ID, headerValue);
131                 }
132                 if ("x-invocationid".equalsIgnoreCase(header)) {
133                     MDC.put("InvocationId", headerValue);
134                     headerValue = UUID.randomUUID().toString();
135                     newInvocationId = headerValue;
136                 }
137                 hdrv.add(new String[]{header, headerValue});
138             }
139         } catch (Exception e) {
140             eelfLogger.error("Exception", e);
141         }
142         hdrs = hdrv.toArray(new String[hdrv.size()][]);
143         url = deliveryTaskHelper.getDestURL(fileid);
144     }
145
146     /**
147      * Is the object a DeliveryTask with the same publication ID.
148      */
149     public boolean equals(Object object) {
150         if (!(object instanceof DeliveryTask)) {
151             return (false);
152         }
153         return (pubid.equals(((DeliveryTask) object).pubid));
154     }
155
156     /**
157      * Compare the publication IDs.
158      */
159     public int compareTo(DeliveryTask other) {
160         return (pubid.compareTo(other.pubid));
161     }
162
163     /**
164      * Get the hash code of the publication ID.
165      */
166     public int hashCode() {
167         return (pubid.hashCode());
168     }
169
170     /**
171      * Return the publication ID.
172      */
173     public String toString() {
174         return (pubid);
175     }
176
177     /**
178      * Get the publish ID.
179      */
180     String getPublishId() {
181         return (pubid);
182     }
183
184     /**
185      * Attempt delivery.
186      */
187     public void run() {
188         attempts++;
189         try {
190             destInfo = deliveryTaskHelper.getDestinationInfo();
191             boolean monly = destInfo.isMetaDataOnly();
192             length = 0;
193             if (!"DELETE".equals(method) && !monly) {
194                 length = datafile.length();
195             }
196             stripSuffixIfIsDecompress();
197             url = deliveryTaskHelper.getDestURL(fileid);
198             URL urlObj = new URL(url);
199             HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
200             urlConnection.setConnectTimeout(60000);
201             urlConnection.setReadTimeout(60000);
202             urlConnection.setInstanceFollowRedirects(false);
203             urlConnection.setRequestMethod(method);
204             urlConnection.setRequestProperty("Content-Length", Long.toString(length));
205             urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
206             urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
207             boolean expect100 = destInfo.isUsing100();
208             int rc = deliverFileToSubscriber(expect100, urlConnection);
209             String rmsg = urlConnection.getResponseMessage();
210             rmsg = getResponseMessage(urlConnection, rmsg);
211             String xpubid = null;
212             InputStream is;
213             if (rc >= 200 && rc <= 299) {
214                 is = urlConnection.getInputStream();
215                 xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
216             } else {
217                 if (rc >= 300 && rc <= 399) {
218                     rmsg = urlConnection.getHeaderField("Location");
219                 }
220                 is = urlConnection.getErrorStream();
221             }
222             byte[] buf = new byte[4096];
223             if (is != null) {
224                 while (is.read(buf) > 0) {
225                     //flush the buffer
226                 }
227                 is.close();
228             }
229             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
230         } catch (Exception e) {
231             eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
232             deliveryTaskHelper.reportException(this, e);
233         }
234     }
235
236     /**
237      * To send decompressed gzip to the subscribers.
238      *
239      * @param httpURLConnection connection used to make request
240      */
241     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
242         byte[] buffer = new byte[8164];
243         httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
244         OutputStream outputStream = getOutputStream(httpURLConnection);
245         if (outputStream != null) {
246             int bytesRead;
247             try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
248                 int bufferLength = buffer.length;
249                 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
250                     outputStream.write(buffer, 0, bytesRead);
251                 }
252                 outputStream.close();
253             } catch (IOException e) {
254                 httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
255                 eelfLogger.info("Could not decompress file", e);
256                 sendFile(httpURLConnection);
257             }
258
259         }
260     }
261
262     /**
263      * To send any file to the subscriber.
264      *
265      * @param httpURLConnection connection used to make request
266      */
267     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
268         OutputStream os = getOutputStream(httpURLConnection);
269         if (os == null) {
270             return;
271         }
272         long sofar = 0;
273         try (InputStream is = new FileInputStream(datafile)) {
274             byte[] buf = new byte[1024 * 1024];
275             while (sofar < length) {
276                 int len = buf.length;
277                 if (sofar + len > length) {
278                     len = (int) (length - sofar);
279                 }
280                 len = is.read(buf, 0, len);
281                 if (len <= 0) {
282                     throw new IOException("Unexpected problem reading data file " + datafile);
283                 }
284                 sofar += len;
285                 os.write(buf, 0, len);
286             }
287             os.close();
288         } catch (IOException ioe) {
289             deliveryTaskHelper.reportDeliveryExtra(this, sofar);
290             throw ioe;
291         }
292     }
293
294     /**
295      * Get the outputstream that will be used to send data.
296      *
297      * @param httpURLConnection connection used to make request
298      * @return AN Outpustream that can be used to send your data.
299      */
300     OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
301         OutputStream outputStream = null;
302         try {
303             outputStream = httpURLConnection.getOutputStream();
304         } catch (ProtocolException pe) {
305             deliveryTaskHelper.reportDeliveryExtra(this, -1L);
306             // Rcvd error instead of 100-continue
307             eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
308         }
309         return outputStream;
310     }
311
312     private void stripSuffixIfIsDecompress() {
313         if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
314             fileid = fileid.replace(".gz", "");
315         }
316     }
317
318     private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
319         for (String[] nv : hdrs) {
320             uc.addRequestProperty(nv[0], nv[1]);
321         }
322         if (length > 0) {
323             if (expect100) {
324                 uc.setRequestProperty("Expect", "100-continue");
325             }
326             uc.setDoOutput(true);
327             if (destInfo.isDecompress()) {
328                 if (isFiletypeGzip(datafile)) {
329                     sendDecompressedFile(uc);
330                 } else {
331                     uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
332                     sendFile(uc);
333                 }
334             } else {
335                 sendFile(uc);
336             }
337         }
338         return uc.getResponseCode();
339     }
340
341     @Nullable
342     private String getResponseMessage(HttpURLConnection uc, String rmsg) {
343         if (rmsg == null) {
344             String h0 = uc.getHeaderField(0);
345             if (h0 != null) {
346                 int indexOfSpace1 = h0.indexOf(' ');
347                 int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
348                 if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
349                     rmsg = h0.substring(indexOfSpace2 + 1);
350                 }
351             }
352         }
353         return rmsg;
354     }
355
356     /**
357      * Remove meta and data files.
358      */
359     void clean() {
360         deleteWithRetry(datafile);
361         deleteWithRetry(metafile);
362         eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
363         eelfLogger.info(EelfMsgs.EXIT);
364         hdrs = null;
365     }
366
367     private void deleteWithRetry(File file) {
368         int maxTries = 3;
369         int tryCount = 1;
370         while (tryCount <= maxTries) {
371             try {
372                 Files.deleteIfExists(file.toPath());
373                 break;
374             } catch (IOException e) {
375                 eelfLogger.error("IOException : Failed to delete file :"
376                                          + file.getName() + " on attempt " + tryCount, e);
377             }
378             tryCount++;
379         }
380     }
381
382     /**
383      * Get the resume time for a delivery task.
384      */
385     long getResumeTime() {
386         return resumeTime;
387     }
388
389     /**
390      * Set the resume time for a delivery task.
391      */
392     void setResumeTime(long resumeTime) {
393         this.resumeTime = resumeTime;
394     }
395
396     /**
397      * Has this delivery task been cleaned.
398      */
399     boolean isCleaned() {
400         return (hdrs == null);
401     }
402
403     /**
404      * Get length of body.
405      */
406     public long getLength() {
407         return (length);
408     }
409
410     /**
411      * Get creation date as encoded in the publish ID.
412      */
413     long getDate() {
414         return (date);
415     }
416
417     /**
418      * Get the most recent delivery attempt URL.
419      */
420     public String getURL() {
421         return (url);
422     }
423
424     /**
425      * Get the content type.
426      */
427     String getCType() {
428         return (ctype);
429     }
430
431     /**
432      * Get the method.
433      */
434     String getMethod() {
435         return (method);
436     }
437
438     /**
439      * Get the file ID.
440      */
441     String getFileId() {
442         return (fileid);
443     }
444
445     /**
446      * Get the number of delivery attempts.
447      */
448     int getAttempts() {
449         return (attempts);
450     }
451
452     /**
453      * Get the (space delimited list of) subscription ID for this delivery task.
454      */
455     String getSubId() {
456         return (subid);
457     }
458
459     /**
460      * Get the feed ID for this delivery task.
461      */
462     String getFeedId() {
463         return (feedid);
464     }
465
466     /**
467      * Get the followRedirects for this delivery task.
468      */
469     boolean getFollowRedirects() {
470         return (followRedirects);
471     }
472 }