[DMAAP-DR] Remove AAF/TLS phase 1
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / delivery / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node.delivery;
26
27 import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
28 import static org.onap.dmaap.datarouter.node.utils.NodeUtils.isFiletypeGzip;
29
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import java.io.BufferedReader;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.FileReader;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.net.HttpURLConnection;
40 import java.net.ProtocolException;
41 import java.net.URL;
42 import java.nio.file.Files;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.UUID;
46 import java.util.zip.GZIPInputStream;
47 import org.jetbrains.annotations.Nullable;
48 import org.onap.dmaap.datarouter.node.DestInfo;
49 import org.onap.dmaap.datarouter.node.utils.NodeUtils;
50 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
51 import org.slf4j.MDC;
52
53 /**
54  * A file to be delivered to a destination.
55  *
56  * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
57  * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
58  */
59 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
60
61     private static final String DECOMPRESSION_STATUS = "Decompression_Status";
62     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
63     private DeliveryTaskHelper deliveryTaskHelper;
64     private String pubid;
65     private DestInfo destInfo;
66     private String spool;
67     private File datafile;
68     private File metafile;
69     private long length;
70     private long date;
71     private String method;
72     private String fileid;
73     private String ctype;
74     private String url;
75     private String feedid;
76     private String subid;
77     private int attempts;
78     private boolean followRedirects;
79     private String[][] hdrs;
80     private String newInvocationId;
81     private long resumeTime;
82
83
84     /**
85      * Create a delivery task for a given delivery queue and pub ID.
86      *
87      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
88      * @param pubid The publish ID for this file.  This is used as the base for the file name in the spool directory and
89      *      is of the form (milliseconds since 1970).(fqdn of initial data router node)
90      */
91     public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
92         this.deliveryTaskHelper = deliveryTaskHelper;
93         this.pubid = pubid;
94         destInfo = deliveryTaskHelper.getDestinationInfo();
95         subid = destInfo.getSubId();
96         this.followRedirects = destInfo.isFollowRedirects();
97         feedid = destInfo.getLogData();
98         spool = destInfo.getSpool();
99         String dfn = spool + File.separator + pubid;
100         String mfn = dfn + ".M";
101         datafile = new File(spool + File.separator + pubid);
102         metafile = new File(mfn);
103         boolean monly = destInfo.isMetaDataOnly();
104         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
105         resumeTime = System.currentTimeMillis();
106         ArrayList<String[]> hdrv = new ArrayList<>();
107
108         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
109             String line = br.readLine();
110             int index = line.indexOf('\t');
111             method = line.substring(0, index);
112             NodeUtils.setIpAndFqdnForEelf(method);
113             if (!"DELETE".equals(method) && !monly) {
114                 length = datafile.length();
115             }
116             fileid = line.substring(index + 1);
117             while ((line = br.readLine()) != null) {
118                 index = line.indexOf('\t');
119                 String header = line.substring(0, index);
120                 String headerValue = line.substring(index + 1);
121                 if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
122                     subid = headerValue.replaceAll("[^ ]*/+", "");
123                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
124                 }
125                 if (length == 0 && header.toLowerCase().startsWith("content-")) {
126                     continue;
127                 }
128                 if ("content-type".equalsIgnoreCase(header)) {
129                     ctype = headerValue;
130                 }
131                 if ("x-onap-requestid".equalsIgnoreCase(header)) {
132                     MDC.put(MDC_KEY_REQUEST_ID, headerValue);
133                 }
134                 if ("x-invocationid".equalsIgnoreCase(header)) {
135                     MDC.put("InvocationId", headerValue);
136                     headerValue = UUID.randomUUID().toString();
137                     newInvocationId = headerValue;
138                 }
139                 hdrv.add(new String[]{header, headerValue});
140             }
141         } catch (Exception e) {
142             eelfLogger.error("Exception", e);
143         }
144         hdrs = hdrv.toArray(new String[hdrv.size()][]);
145         url = deliveryTaskHelper.getDestURL(fileid);
146     }
147
148     /**
149      * Is the object a DeliveryTask with the same publication ID.
150      */
151     public boolean equals(Object object) {
152         if (!(object instanceof DeliveryTask)) {
153             return (false);
154         }
155         return (pubid.equals(((DeliveryTask) object).pubid));
156     }
157
158     /**
159      * Compare the publication IDs.
160      */
161     public int compareTo(DeliveryTask other) {
162         return (pubid.compareTo(other.pubid));
163     }
164
165     /**
166      * Get the hash code of the publication ID.
167      */
168     public int hashCode() {
169         return (pubid.hashCode());
170     }
171
172     /**
173      * Return the publication ID.
174      */
175     public String toString() {
176         return (pubid);
177     }
178
179     /**
180      * Get the publish ID.
181      */
182     public String getPublishId() {
183         return (pubid);
184     }
185
186     /**
187      * Attempt delivery.
188      */
189     public void run() {
190         attempts++;
191         try {
192             destInfo = deliveryTaskHelper.getDestinationInfo();
193             boolean monly = destInfo.isMetaDataOnly();
194             length = 0;
195             if (!"DELETE".equals(method) && !monly) {
196                 length = datafile.length();
197             }
198             stripSuffixIfIsDecompress();
199             url = deliveryTaskHelper.getDestURL(fileid);
200             URL urlObj = new URL(url);
201             HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
202             urlConnection.setConnectTimeout(60000);
203             urlConnection.setReadTimeout(60000);
204             urlConnection.setInstanceFollowRedirects(false);
205             urlConnection.setRequestMethod(method);
206             urlConnection.setRequestProperty("Content-Length", Long.toString(length));
207             urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
208             urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
209             boolean expect100 = destInfo.isUsing100();
210             int rc = deliverFileToSubscriber(expect100, urlConnection);
211             String rmsg = urlConnection.getResponseMessage();
212             rmsg = getResponseMessage(urlConnection, rmsg);
213             String xpubid = null;
214             InputStream is;
215             if (rc >= 200 && rc <= 299) {
216                 is = urlConnection.getInputStream();
217                 xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
218             } else {
219                 if (rc >= 300 && rc <= 399) {
220                     rmsg = urlConnection.getHeaderField("Location");
221                 }
222                 is = urlConnection.getErrorStream();
223             }
224             byte[] buf = new byte[4096];
225             if (is != null) {
226                 while (is.read(buf) > 0) {
227                     //flush the buffer
228                 }
229                 is.close();
230             }
231             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
232         } catch (Exception e) {
233             eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
234             deliveryTaskHelper.reportException(this, e);
235         }
236     }
237
238     /**
239      * To send decompressed gzip to the subscribers.
240      *
241      * @param httpURLConnection connection used to make request
242      */
243     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
244         byte[] buffer = new byte[8164];
245         httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
246         OutputStream outputStream = getOutputStream(httpURLConnection);
247         if (outputStream != null) {
248             int bytesRead;
249             try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
250                 int bufferLength = buffer.length;
251                 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
252                     outputStream.write(buffer, 0, bytesRead);
253                 }
254                 outputStream.close();
255             } catch (IOException e) {
256                 httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
257                 eelfLogger.info("Could not decompress file", e);
258                 sendFile(httpURLConnection);
259             }
260
261         }
262     }
263
264     /**
265      * To send any file to the subscriber.
266      *
267      * @param httpURLConnection connection used to make request
268      */
269     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
270         OutputStream os = getOutputStream(httpURLConnection);
271         if (os == null) {
272             return;
273         }
274         long sofar = 0;
275         try (InputStream is = new FileInputStream(datafile)) {
276             byte[] buf = new byte[1024 * 1024];
277             while (sofar < length) {
278                 int len = buf.length;
279                 if (sofar + len > length) {
280                     len = (int) (length - sofar);
281                 }
282                 len = is.read(buf, 0, len);
283                 if (len <= 0) {
284                     throw new IOException("Unexpected problem reading data file " + datafile);
285                 }
286                 sofar += len;
287                 os.write(buf, 0, len);
288             }
289             os.close();
290         } catch (IOException ioe) {
291             deliveryTaskHelper.reportDeliveryExtra(this, sofar);
292             throw ioe;
293         }
294     }
295
296     /**
297      * Get the outputstream that will be used to send data.
298      *
299      * @param httpURLConnection connection used to make request
300      * @return AN Outpustream that can be used to send your data.
301      */
302     OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
303         OutputStream outputStream = null;
304         try {
305             outputStream = httpURLConnection.getOutputStream();
306         } catch (ProtocolException pe) {
307             deliveryTaskHelper.reportDeliveryExtra(this, -1L);
308             // Rcvd error instead of 100-continue
309             eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
310         }
311         return outputStream;
312     }
313
314     private void stripSuffixIfIsDecompress() {
315         if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
316             fileid = fileid.replace(".gz", "");
317         }
318     }
319
320     private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
321         for (String[] nv : hdrs) {
322             uc.addRequestProperty(nv[0], nv[1]);
323         }
324         if (length > 0) {
325             if (expect100) {
326                 uc.setRequestProperty("Expect", "100-continue");
327             }
328             uc.setDoOutput(true);
329             if (destInfo.isDecompress()) {
330                 if (isFiletypeGzip(datafile)) {
331                     sendDecompressedFile(uc);
332                 } else {
333                     uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
334                     sendFile(uc);
335                 }
336             } else {
337                 sendFile(uc);
338             }
339         }
340         return uc.getResponseCode();
341     }
342
343     @Nullable
344     private String getResponseMessage(HttpURLConnection uc, String rmsg) {
345         if (rmsg == null) {
346             String h0 = uc.getHeaderField(0);
347             if (h0 != null) {
348                 int indexOfSpace1 = h0.indexOf(' ');
349                 int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
350                 if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
351                     rmsg = h0.substring(indexOfSpace2 + 1);
352                 }
353             }
354         }
355         return rmsg;
356     }
357
358     /**
359      * Remove meta and data files.
360      */
361     public void clean() {
362         deleteWithRetry(datafile);
363         deleteWithRetry(metafile);
364         eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
365         eelfLogger.info(EelfMsgs.EXIT);
366         hdrs = null;
367     }
368
369     private void deleteWithRetry(File file) {
370         int maxTries = 3;
371         int tryCount = 1;
372         while (tryCount <= maxTries) {
373             try {
374                 Files.deleteIfExists(file.toPath());
375                 break;
376             } catch (IOException e) {
377                 eelfLogger.error("IOException : Failed to delete file :"
378                                          + file.getName() + " on attempt " + tryCount, e);
379             }
380             tryCount++;
381         }
382     }
383
384     /**
385      * Get the resume time for a delivery task.
386      */
387     long getResumeTime() {
388         return resumeTime;
389     }
390
391     /**
392      * Set the resume time for a delivery task.
393      */
394     public void setResumeTime(long resumeTime) {
395         this.resumeTime = resumeTime;
396     }
397
398     /**
399      * Has this delivery task been cleaned.
400      */
401     public boolean isCleaned() {
402         return (hdrs == null);
403     }
404
405     /**
406      * Get length of body.
407      */
408     public long getLength() {
409         return (length);
410     }
411
412     /**
413      * Get creation date as encoded in the publish ID.
414      */
415     public long getDate() {
416         return (date);
417     }
418
419     /**
420      * Get the most recent delivery attempt URL.
421      */
422     public String getURL() {
423         return (url);
424     }
425
426     /**
427      * Get the content type.
428      */
429     public String getCType() {
430         return (ctype);
431     }
432
433     /**
434      * Get the method.
435      */
436     public String getMethod() {
437         return (method);
438     }
439
440     /**
441      * Get the file ID.
442      */
443     public String getFileId() {
444         return (fileid);
445     }
446
447     /**
448      * Get the number of delivery attempts.
449      */
450     public int getAttempts() {
451         return (attempts);
452     }
453
454     /**
455      * Get the (space delimited list of) subscription ID for this delivery task.
456      */
457     public String getSubId() {
458         return (subid);
459     }
460
461     /**
462      * Get the feed ID for this delivery task.
463      */
464     public String getFeedId() {
465         return (feedid);
466     }
467
468     /**
469      * Get the followRedirects for this delivery task.
470      */
471     public boolean getFollowRedirects() {
472         return (followRedirects);
473     }
474 }