Add optional API for PM Mapper
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.io.*;
28 import java.net.*;
29 import java.util.*;
30
31 import com.att.eelf.configuration.EELFLogger;
32 import com.att.eelf.configuration.EELFManager;
33 import org.apache.log4j.Logger;
34 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
35 import org.slf4j.MDC;
36
37 import static com.att.eelf.configuration.Configuration.*;
38
39 /**
40  * A file to be delivered to a destination.
41  * <p>
42  * A Delivery task represents a work item for the data router - a file that
43  * needs to be delivered and provides mechanisms to get information about
44  * the file and its delivery data as well as to attempt delivery.
45  */
46 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
47     private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
48     private static EELFLogger eelflogger = EELFManager.getInstance()
49             .getLogger(DeliveryTask.class);
50     private DeliveryTaskHelper deliveryTaskHelper;
51     private String pubid;
52     private DestInfo destInfo;
53     private String spool;
54     private File datafile;
55     private File metafile;
56     private long length;
57     private long date;
58     private String method;
59     private String fileid;
60     private String ctype;
61     private String url;
62     private String feedid;
63     private String subid;
64     private int attempts;
65     private String[][] hdrs;
66     private String newInvocationId;
67
68
69     /**
70      * Create a delivery task for a given delivery queue and pub ID
71      *
72      * @param    deliveryTaskHelper    The delivery task helper for the queue this task is in.
73      * @param    pubid    The publish ID for this file.  This is used as
74      * the base for the file name in the spool directory and is of
75      * the form <milliseconds since 1970>.<fqdn of initial data router node>
76      */
77     public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
78         this.deliveryTaskHelper = deliveryTaskHelper;
79         this.pubid = pubid;
80         destInfo = deliveryTaskHelper.getDestinationInfo();
81         subid = destInfo.getSubId();
82         feedid = destInfo.getLogData();
83         spool = destInfo.getSpool();
84         String dfn = spool + "/" + pubid;
85         String mfn = dfn + ".M";
86         datafile = new File(spool + "/" + pubid);
87         metafile = new File(mfn);
88         boolean monly = destInfo.isMetaDataOnly();
89         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
90         Vector<String[]> hdrv = new Vector<>();
91
92         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
93             String s = br.readLine();
94             int i = s.indexOf('\t');
95             method = s.substring(0, i);
96             NodeUtils.setIpAndFqdnForEelf(method);
97             if (!"DELETE".equals(method) && !monly) {
98                 length = datafile.length();
99             }
100             fileid = s.substring(i + 1);
101             while ((s = br.readLine()) != null) {
102                 i = s.indexOf('\t');
103                 String h = s.substring(0, i);
104                 String v = s.substring(i + 1);
105                 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
106                     subid = v.replaceAll("[^ ]*/", "");
107                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
108                 }
109                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
110                     continue;
111                 }
112                 if (h.equalsIgnoreCase("content-type")) {
113                     ctype = v;
114                 }
115                 if (h.equalsIgnoreCase("x-onap-requestid")) {
116                     MDC.put(MDC_KEY_REQUEST_ID, v);
117                 }
118                 if (h.equalsIgnoreCase("x-invocationid")) {
119                     MDC.put("InvocationId", v);
120                     v = UUID.randomUUID().toString();
121                     newInvocationId = v;
122                 }
123                 hdrv.add(new String[]{h, v});
124             }
125         } catch (Exception e) {
126             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
127         }
128         hdrs = hdrv.toArray(new String[hdrv.size()][]);
129         url = deliveryTaskHelper.getDestURL(fileid);
130     }
131     /**
132      * Is the object a DeliveryTask with the same publication ID?
133      */
134     public boolean equals(Object o) {
135         if (!(o instanceof DeliveryTask)) {
136             return (false);
137         }
138         return (pubid.equals(((DeliveryTask) o).pubid));
139     }
140
141     /**
142      * Compare the publication IDs.
143      */
144     public int compareTo(DeliveryTask o) {
145         return (pubid.compareTo(o.pubid));
146     }
147
148     /**
149      * Get the hash code of the publication ID.
150      */
151     public int hashCode() {
152         return (pubid.hashCode());
153     }
154
155     /**
156      * Return the publication ID.
157      */
158     public String toString() {
159         return (pubid);
160     }
161     /**
162      * Get the publish ID
163      */
164     public String getPublishId() {
165         return (pubid);
166     }
167
168     /**
169      * Attempt delivery
170      */
171     public void run() {
172         attempts++;
173         try {
174             destInfo = deliveryTaskHelper.getDestinationInfo();
175             boolean expect100 = destInfo.isUsing100();
176             boolean monly = destInfo.isMetaDataOnly();
177             length = 0;
178             if (!"DELETE".equals(method) && !monly) {
179                 length = datafile.length();
180             }
181             url = deliveryTaskHelper.getDestURL(fileid);
182             URL u = new URL(url);
183             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
184             uc.setConnectTimeout(60000);
185             uc.setReadTimeout(60000);
186             uc.setInstanceFollowRedirects(false);
187             uc.setRequestMethod(method);
188             uc.setRequestProperty("Content-Length", Long.toString(length));
189             uc.setRequestProperty("Authorization", destInfo.getAuth());
190             uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
191             for (String[] nv : hdrs) {
192                 uc.addRequestProperty(nv[0], nv[1]);
193             }
194             if (length > 0) {
195                 if (expect100) {
196                     uc.setRequestProperty("Expect", "100-continue");
197                 }
198                 uc.setFixedLengthStreamingMode(length);
199                 uc.setDoOutput(true);
200                 OutputStream os = null;
201                 try {
202                     os = uc.getOutputStream();
203                 } catch (ProtocolException pe) {
204                     deliveryTaskHelper.reportDeliveryExtra(this, -1L);
205                     // Rcvd error instead of 100-continue
206                     loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
207                 }
208                 if (os != null) {
209                     long sofar = 0;
210                     try (InputStream is = new FileInputStream(datafile)) {
211                         byte[] buf = new byte[1024 * 1024];
212                         while (sofar < length) {
213                             int i = buf.length;
214                             if (sofar + i > length) {
215                                 i = (int) (length - sofar);
216                             }
217                             i = is.read(buf, 0, i);
218                             if (i <= 0) {
219                                 throw new IOException("Unexpected problem reading data file " + datafile);
220                             }
221                             sofar += i;
222                             os.write(buf, 0, i);
223                         }
224                         os.close();
225                     } catch (IOException ioe) {
226                         deliveryTaskHelper.reportDeliveryExtra(this, sofar);
227                         throw ioe;
228                     }
229                 }
230             }
231             int rc = uc.getResponseCode();
232             String rmsg = uc.getResponseMessage();
233             if (rmsg == null) {
234                 String h0 = uc.getHeaderField(0);
235                 if (h0 != null) {
236                     int i = h0.indexOf(' ');
237                     int j = h0.indexOf(' ', i + 1);
238                     if (i != -1 && j != -1) {
239                         rmsg = h0.substring(j + 1);
240                     }
241                 }
242             }
243             String xpubid = null;
244             InputStream is;
245             if (rc >= 200 && rc <= 299) {
246                 is = uc.getInputStream();
247                 xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
248             } else {
249                 if (rc >= 300 && rc <= 399) {
250                     rmsg = uc.getHeaderField("Location");
251                 }
252                 is = uc.getErrorStream();
253             }
254             byte[] buf = new byte[4096];
255             if (is != null) {
256                 while (is.read(buf) > 0) {
257                 }
258                 is.close();
259             }
260             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
261         } catch (Exception e) {
262             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
263             deliveryTaskHelper.reportException(this, e);
264         }
265     }
266
267     /**
268      * Remove meta and data files
269      */
270     public void clean() {
271         datafile.delete();
272         metafile.delete();
273         eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
274         eelflogger.info(EelfMsgs.EXIT);
275         hdrs = null;
276     }
277
278     /**
279      * Has this delivery task been cleaned?
280      */
281     public boolean isCleaned() {
282         return (hdrs == null);
283     }
284
285     /**
286      * Get length of body
287      */
288     public long getLength() {
289         return (length);
290     }
291
292     /**
293      * Get creation date as encoded in the publish ID.
294      */
295     public long getDate() {
296         return (date);
297     }
298
299     /**
300      * Get the most recent delivery attempt URL
301      */
302     public String getURL() {
303         return (url);
304     }
305
306     /**
307      * Get the content type
308      */
309     public String getCType() {
310         return (ctype);
311     }
312
313     /**
314      * Get the method
315      */
316     public String getMethod() {
317         return (method);
318     }
319
320     /**
321      * Get the file ID
322      */
323     public String getFileId() {
324         return (fileid);
325     }
326
327     /**
328      * Get the number of delivery attempts
329      */
330     public int getAttempts() {
331         return (attempts);
332     }
333
334     /**
335      * Get the (space delimited list of) subscription ID for this delivery task
336      */
337     public String getSubId() {
338         return (subid);
339     }
340
341     /**
342      * Get the feed ID for this delivery task
343      */
344     public String getFeedId() {
345         return (feedid);
346     }
347 }