1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
30 import java.util.zip.GZIPInputStream;
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34 import org.apache.log4j.Logger;
35 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
38 import static com.att.eelf.configuration.Configuration.*;
39 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
42 * A file to be delivered to a destination.
44 * A Delivery task represents a work item for the data router - a file that
45 * needs to be delivered and provides mechanisms to get information about
46 * the file and its delivery data as well as to attempt delivery.
48 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
49 private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
50 private static EELFLogger eelflogger = EELFManager.getInstance()
51 .getLogger(DeliveryTask.class);
52 private DeliveryTaskHelper deliveryTaskHelper;
54 private DestInfo destInfo;
56 private File datafile;
57 private File metafile;
60 private String method;
61 private String fileid;
64 private String feedid;
67 private boolean followRedirects;
68 private String[][] hdrs;
69 private String newInvocationId;
70 private long resumeTime;
74 * Create a delivery task for a given delivery queue and pub ID
76 * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
77 * @param pubid The publish ID for this file. This is used as
78 * the base for the file name in the spool directory and is of
79 * the form <milliseconds since 1970>.<fqdn of initial data router node>
81 DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
82 this.deliveryTaskHelper = deliveryTaskHelper;
84 destInfo = deliveryTaskHelper.getDestinationInfo();
85 subid = destInfo.getSubId();
86 this.followRedirects = destInfo.isFollowRedirects();
87 feedid = destInfo.getLogData();
88 spool = destInfo.getSpool();
89 String dfn = spool + "/" + pubid;
90 String mfn = dfn + ".M";
91 datafile = new File(spool + "/" + pubid);
92 metafile = new File(mfn);
93 boolean monly = destInfo.isMetaDataOnly();
94 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
95 resumeTime = System.currentTimeMillis();
96 Vector<String[]> hdrv = new Vector<>();
98 try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
99 String s = br.readLine();
100 int i = s.indexOf('\t');
101 method = s.substring(0, i);
102 NodeUtils.setIpAndFqdnForEelf(method);
103 if (!"DELETE".equals(method) && !monly) {
104 length = datafile.length();
106 fileid = s.substring(i + 1);
107 while ((s = br.readLine()) != null) {
109 String h = s.substring(0, i);
110 String v = s.substring(i + 1);
111 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
112 subid = v.replaceAll("[^ ]*/", "");
113 feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
115 if (length == 0 && h.toLowerCase().startsWith("content-")) {
118 if (h.equalsIgnoreCase("content-type")) {
121 if (h.equalsIgnoreCase("x-onap-requestid")) {
122 MDC.put(MDC_KEY_REQUEST_ID, v);
124 if (h.equalsIgnoreCase("x-invocationid")) {
125 MDC.put("InvocationId", v);
126 v = UUID.randomUUID().toString();
129 hdrv.add(new String[]{h, v});
131 } catch (Exception e) {
132 loggerDeliveryTask.error("Exception "+ Arrays.toString(e.getStackTrace()), e);
134 hdrs = hdrv.toArray(new String[hdrv.size()][]);
135 url = deliveryTaskHelper.getDestURL(fileid);
139 * Is the object a DeliveryTask with the same publication ID?
141 public boolean equals(Object o) {
142 if (!(o instanceof DeliveryTask)) {
145 return (pubid.equals(((DeliveryTask) o).pubid));
149 * Compare the publication IDs.
151 public int compareTo(DeliveryTask o) {
152 return (pubid.compareTo(o.pubid));
156 * Get the hash code of the publication ID.
158 public int hashCode() {
159 return (pubid.hashCode());
163 * Return the publication ID.
165 public String toString() {
172 String getPublishId() {
182 destInfo = deliveryTaskHelper.getDestinationInfo();
183 boolean expect100 = destInfo.isUsing100();
184 boolean monly = destInfo.isMetaDataOnly();
186 if (!"DELETE".equals(method) && !monly) {
187 length = datafile.length();
189 if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
190 fileid = fileid.replace(".gz", "");
192 url = deliveryTaskHelper.getDestURL(fileid);
193 URL u = new URL(url);
194 HttpURLConnection uc = (HttpURLConnection) u.openConnection();
195 uc.setConnectTimeout(60000);
196 uc.setReadTimeout(60000);
197 uc.setInstanceFollowRedirects(false);
198 uc.setRequestMethod(method);
199 uc.setRequestProperty("Content-Length", Long.toString(length));
200 uc.setRequestProperty("Authorization", destInfo.getAuth());
201 uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
202 for (String[] nv : hdrs) {
203 uc.addRequestProperty(nv[0], nv[1]);
207 uc.setRequestProperty("Expect", "100-continue");
209 uc.setDoOutput(true);
210 if (destInfo.isDecompress()) {
211 if (isFiletypeGzip(datafile)) {
212 sendDecompressedFile(uc);
214 uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
221 int rc = uc.getResponseCode();
222 String rmsg = uc.getResponseMessage();
224 String h0 = uc.getHeaderField(0);
226 int i = h0.indexOf(' ');
227 int j = h0.indexOf(' ', i + 1);
228 if (i != -1 && j != -1) {
229 rmsg = h0.substring(j + 1);
233 String xpubid = null;
235 if (rc >= 200 && rc <= 299) {
236 is = uc.getInputStream();
237 xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
239 if (rc >= 300 && rc <= 399) {
240 rmsg = uc.getHeaderField("Location");
242 is = uc.getErrorStream();
244 byte[] buf = new byte[4096];
246 while (is.read(buf) > 0) {
250 deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
251 } catch (Exception e) {
252 loggerDeliveryTask.error("Exception " + Arrays.toString(e.getStackTrace()), e);
253 deliveryTaskHelper.reportException(this, e);
258 * To send decompressed gzip to the subscribers
260 * @param httpURLConnection connection used to make request
261 * @throws IOException
263 private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
264 byte[] buffer = new byte[8164];
265 httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
266 OutputStream outputStream = getOutputStream(httpURLConnection);
267 if (outputStream != null) {
269 try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
270 int bufferLength = buffer.length;
271 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
272 outputStream.write(buffer, 0, bytesRead);
274 outputStream.close();
275 } catch (IOException e) {
276 httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
277 loggerDeliveryTask.info("Could not decompress file");
278 sendFile(httpURLConnection);
285 * To send any file to the subscriber.
287 * @param httpURLConnection connection used to make request
288 * @throws IOException
290 private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
291 OutputStream os = getOutputStream(httpURLConnection);
294 try (InputStream is = new FileInputStream(datafile)) {
295 byte[] buf = new byte[1024 * 1024];
296 while (sofar < length) {
298 if (sofar + i > length) {
299 i = (int) (length - sofar);
301 i = is.read(buf, 0, i);
303 throw new IOException("Unexpected problem reading data file " + datafile);
309 } catch (IOException ioe) {
310 deliveryTaskHelper.reportDeliveryExtra(this, sofar);
317 * Get the outputstream that will be used to send data
319 * @param httpURLConnection connection used to make request
320 * @return AN Outpustream that can be used to send your data.
321 * @throws IOException
323 private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
324 OutputStream outputStream = null;
327 outputStream = httpURLConnection.getOutputStream();
328 } catch (ProtocolException pe) {
329 deliveryTaskHelper.reportDeliveryExtra(this, -1L);
330 // Rcvd error instead of 100-continue
331 loggerDeliveryTask.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
337 * Remove meta and data files
342 eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
343 eelflogger.info(EelfMsgs.EXIT);
348 * Set the resume time for a delivery task.
350 void setResumeTime(long resumeTime) {
351 this.resumeTime = resumeTime;
355 * Get the resume time for a delivery task.
357 long getResumeTime() {
362 * Has this delivery task been cleaned?
364 boolean isCleaned() {
365 return (hdrs == null);
371 public long getLength() {
376 * Get creation date as encoded in the publish ID.
383 * Get the most recent delivery attempt URL
385 public String getURL() {
390 * Get the content type
411 * Get the number of delivery attempts
418 * Get the (space delimited list of) subscription ID for this delivery task
425 * Get the feed ID for this delivery task
432 * Get the followRedirects for this delivery task
434 public boolean getFollowRedirects() {
435 return(followRedirects);