Remove major and minor code smells in dr-node
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
28 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
29
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import java.io.BufferedReader;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.FileReader;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.net.HttpURLConnection;
40 import java.net.ProtocolException;
41 import java.net.URL;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.UUID;
45 import java.util.zip.GZIPInputStream;
46 import org.jetbrains.annotations.Nullable;
47 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
48 import org.slf4j.MDC;
49
50 /**
51  * A file to be delivered to a destination.
52  *
53  * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
54  * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
55  */
56 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
57
58     private static final String DECOMPRESSION_STATUS = "Decompression_Status";
59     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
60     private DeliveryTaskHelper deliveryTaskHelper;
61     private String pubid;
62     private DestInfo destInfo;
63     private String spool;
64     private File datafile;
65     private File metafile;
66     private long length;
67     private long date;
68     private String method;
69     private String fileid;
70     private String ctype;
71     private String url;
72     private String feedid;
73     private String subid;
74     private int attempts;
75     private boolean followRedirects;
76     private String[][] hdrs;
77     private String newInvocationId;
78     private long resumeTime;
79
80
81     /**
82      * Create a delivery task for a given delivery queue and pub ID
83      *
84      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
85      * @param pubid The publish ID for this file.  This is used as the base for the file name in the spool directory and
86      * is of the form <milliseconds since 1970>.<fqdn of initial data router node>
87      */
88     DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
89         this.deliveryTaskHelper = deliveryTaskHelper;
90         this.pubid = pubid;
91         destInfo = deliveryTaskHelper.getDestinationInfo();
92         subid = destInfo.getSubId();
93         this.followRedirects = destInfo.isFollowRedirects();
94         feedid = destInfo.getLogData();
95         spool = destInfo.getSpool();
96         String dfn = spool + "/" + pubid;
97         String mfn = dfn + ".M";
98         datafile = new File(spool + "/" + pubid);
99         metafile = new File(mfn);
100         boolean monly = destInfo.isMetaDataOnly();
101         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
102         resumeTime = System.currentTimeMillis();
103         ArrayList<String[]> hdrv = new ArrayList<>();
104
105         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
106             String line = br.readLine();
107             int index = line.indexOf('\t');
108             method = line.substring(0, index);
109             NodeUtils.setIpAndFqdnForEelf(method);
110             if (!"DELETE".equals(method) && !monly) {
111                 length = datafile.length();
112             }
113             fileid = line.substring(index + 1);
114             while ((line = br.readLine()) != null) {
115                 index = line.indexOf('\t');
116                 String header = line.substring(0, index);
117                 String headerValue = line.substring(index + 1);
118                 if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
119                     subid = headerValue.replaceAll("[^ ]*/", "");
120                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
121                 }
122                 if (length == 0 && header.toLowerCase().startsWith("content-")) {
123                     continue;
124                 }
125                 if ("content-type".equalsIgnoreCase(header)) {
126                     ctype = headerValue;
127                 }
128                 if ("x-onap-requestid".equalsIgnoreCase(header)) {
129                     MDC.put(MDC_KEY_REQUEST_ID, headerValue);
130                 }
131                 if ("x-invocationid".equalsIgnoreCase(header)) {
132                     MDC.put("InvocationId", headerValue);
133                     headerValue = UUID.randomUUID().toString();
134                     newInvocationId = headerValue;
135                 }
136                 hdrv.add(new String[]{header, headerValue});
137             }
138         } catch (Exception e) {
139             eelfLogger.error("Exception", e);
140         }
141         hdrs = hdrv.toArray(new String[hdrv.size()][]);
142         url = deliveryTaskHelper.getDestURL(fileid);
143     }
144
145     /**
146      * Is the object a DeliveryTask with the same publication ID.
147      */
148     public boolean equals(Object object) {
149         if (!(object instanceof DeliveryTask)) {
150             return (false);
151         }
152         return (pubid.equals(((DeliveryTask) object).pubid));
153     }
154
155     /**
156      * Compare the publication IDs.
157      */
158     public int compareTo(DeliveryTask other) {
159         return (pubid.compareTo(other.pubid));
160     }
161
162     /**
163      * Get the hash code of the publication ID.
164      */
165     public int hashCode() {
166         return (pubid.hashCode());
167     }
168
169     /**
170      * Return the publication ID.
171      */
172     public String toString() {
173         return (pubid);
174     }
175
176     /**
177      * Get the publish ID.
178      */
179     String getPublishId() {
180         return (pubid);
181     }
182
183     /**
184      * Attempt delivery.
185      */
186     public void run() {
187         attempts++;
188         try {
189             destInfo = deliveryTaskHelper.getDestinationInfo();
190             boolean monly = destInfo.isMetaDataOnly();
191             length = 0;
192             if (!"DELETE".equals(method) && !monly) {
193                 length = datafile.length();
194             }
195             stripSuffixIfIsDecompress();
196             url = deliveryTaskHelper.getDestURL(fileid);
197             URL urlObj = new URL(url);
198             HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
199             urlConnection.setConnectTimeout(60000);
200             urlConnection.setReadTimeout(60000);
201             urlConnection.setInstanceFollowRedirects(false);
202             urlConnection.setRequestMethod(method);
203             urlConnection.setRequestProperty("Content-Length", Long.toString(length));
204             urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
205             urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
206             boolean expect100 = destInfo.isUsing100();
207             int rc = deliverFileToSubscriber(expect100, urlConnection);
208             String rmsg = urlConnection.getResponseMessage();
209             rmsg = getResponseMessage(urlConnection, rmsg);
210             String xpubid = null;
211             InputStream is;
212             if (rc >= 200 && rc <= 299) {
213                 is = urlConnection.getInputStream();
214                 xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
215             } else {
216                 if (rc >= 300 && rc <= 399) {
217                     rmsg = urlConnection.getHeaderField("Location");
218                 }
219                 is = urlConnection.getErrorStream();
220             }
221             byte[] buf = new byte[4096];
222             if (is != null) {
223                 while (is.read(buf) > 0) {
224                 }
225                 is.close();
226             }
227             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
228         } catch (Exception e) {
229             eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
230             deliveryTaskHelper.reportException(this, e);
231         }
232     }
233
234     /**
235      * To send decompressed gzip to the subscribers.
236      *
237      * @param httpURLConnection connection used to make request
238      */
239     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
240         byte[] buffer = new byte[8164];
241         httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
242         OutputStream outputStream = getOutputStream(httpURLConnection);
243         if (outputStream != null) {
244             int bytesRead = 0;
245             try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
246                 int bufferLength = buffer.length;
247                 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
248                     outputStream.write(buffer, 0, bytesRead);
249                 }
250                 outputStream.close();
251             } catch (IOException e) {
252                 httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
253                 eelfLogger.info("Could not decompress file", e);
254                 sendFile(httpURLConnection);
255             }
256
257         }
258     }
259
260     /**
261      * To send any file to the subscriber.
262      *
263      * @param httpURLConnection connection used to make request
264      */
265     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
266         OutputStream os = getOutputStream(httpURLConnection);
267         if (os == null) {
268             return;
269         }
270         long sofar = 0;
271         try (InputStream is = new FileInputStream(datafile)) {
272             byte[] buf = new byte[1024 * 1024];
273             while (sofar < length) {
274                 int len = buf.length;
275                 if (sofar + len > length) {
276                     len = (int) (length - sofar);
277                 }
278                 len = is.read(buf, 0, len);
279                 if (len <= 0) {
280                     throw new IOException("Unexpected problem reading data file " + datafile);
281                 }
282                 sofar += len;
283                 os.write(buf, 0, len);
284             }
285             os.close();
286         } catch (IOException ioe) {
287             deliveryTaskHelper.reportDeliveryExtra(this, sofar);
288             throw ioe;
289         }
290     }
291
292     /**
293      * Get the outputstream that will be used to send data.
294      *
295      * @param httpURLConnection connection used to make request
296      * @return AN Outpustream that can be used to send your data.
297      */
298     private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
299         OutputStream outputStream = null;
300         try {
301             outputStream = httpURLConnection.getOutputStream();
302         } catch (ProtocolException pe) {
303             deliveryTaskHelper.reportDeliveryExtra(this, -1L);
304             // Rcvd error instead of 100-continue
305             eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
306         }
307         return outputStream;
308     }
309
310     private void stripSuffixIfIsDecompress() {
311         if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
312             fileid = fileid.replace(".gz", "");
313         }
314     }
315
316     private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
317         for (String[] nv : hdrs) {
318             uc.addRequestProperty(nv[0], nv[1]);
319         }
320         if (length > 0) {
321             if (expect100) {
322                 uc.setRequestProperty("Expect", "100-continue");
323             }
324             uc.setDoOutput(true);
325             if (destInfo.isDecompress()) {
326                 if (isFiletypeGzip(datafile)) {
327                     sendDecompressedFile(uc);
328                 } else {
329                     uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
330                     sendFile(uc);
331                 }
332             } else {
333                 sendFile(uc);
334             }
335         }
336         return uc.getResponseCode();
337     }
338
339     @Nullable
340     private String getResponseMessage(HttpURLConnection uc, String rmsg) {
341         if (rmsg == null) {
342             String h0 = uc.getHeaderField(0);
343             if (h0 != null) {
344                 int indexOfSpace1 = h0.indexOf(' ');
345                 int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
346                 if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
347                     rmsg = h0.substring(indexOfSpace2 + 1);
348                 }
349             }
350         }
351         return rmsg;
352     }
353
354     /**
355      * Remove meta and data files.
356      */
357     void clean() {
358         datafile.delete();
359         metafile.delete();
360         eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
361         eelfLogger.info(EelfMsgs.EXIT);
362         hdrs = null;
363     }
364
365     /**
366      * Get the resume time for a delivery task.
367      */
368     long getResumeTime() {
369         return resumeTime;
370     }
371
372     /**
373      * Set the resume time for a delivery task.
374      */
375     void setResumeTime(long resumeTime) {
376         this.resumeTime = resumeTime;
377     }
378
379     /**
380      * Has this delivery task been cleaned.
381      */
382     boolean isCleaned() {
383         return (hdrs == null);
384     }
385
386     /**
387      * Get length of body.
388      */
389     public long getLength() {
390         return (length);
391     }
392
393     /**
394      * Get creation date as encoded in the publish ID.
395      */
396     long getDate() {
397         return (date);
398     }
399
400     /**
401      * Get the most recent delivery attempt URL.
402      */
403     public String getURL() {
404         return (url);
405     }
406
407     /**
408      * Get the content type.
409      */
410     String getCType() {
411         return (ctype);
412     }
413
414     /**
415      * Get the method.
416      */
417     String getMethod() {
418         return (method);
419     }
420
421     /**
422      * Get the file ID.
423      */
424     String getFileId() {
425         return (fileid);
426     }
427
428     /**
429      * Get the number of delivery attempts.
430      */
431     int getAttempts() {
432         return (attempts);
433     }
434
435     /**
436      * Get the (space delimited list of) subscription ID for this delivery task.
437      */
438     String getSubId() {
439         return (subid);
440     }
441
442     /**
443      * Get the feed ID for this delivery task.
444      */
445     String getFeedId() {
446         return (feedid);
447     }
448
449     /**
450      * Get the followRedirects for this delivery task.
451      */
452     public boolean getFollowRedirects() {
453         return (followRedirects);
454     }
455 }