import java.io.*;
import java.net.*;
import java.util.*;
+import java.util.zip.GZIPInputStream;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import org.apache.log4j.Logger;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
+import org.slf4j.MDC;
+
+import static com.att.eelf.configuration.Configuration.*;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
/**
* A file to be delivered to a destination.
*/
public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
- private DeliveryTaskHelper dth;
+ private static EELFLogger eelflogger = EELFManager.getInstance()
+ .getLogger(DeliveryTask.class);
+ private DeliveryTaskHelper deliveryTaskHelper;
private String pubid;
- private DestInfo di;
+ private DestInfo destInfo;
private String spool;
private File datafile;
private File metafile;
private String feedid;
private String subid;
private int attempts;
+ private boolean followRedirects;
private String[][] hdrs;
+ private String newInvocationId;
+ private long resumeTime;
/**
* Create a delivery task for a given delivery queue and pub ID
*
- * @param dth The delivery task helper for the queue this task is in.
- * @param pubid The publish ID for this file. This is used as
- * the base for the file name in the spool directory and is of
- * the form <milliseconds since 1970>.<fqdn of initial data router node>
+ * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
+ * @param pubid The publish ID for this file. This is used as
+ * the base for the file name in the spool directory and is of
+ * the form <milliseconds since 1970>.<fqdn of initial data router node>
*/
- public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
- this.dth = dth;
+ DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+ this.deliveryTaskHelper = deliveryTaskHelper;
this.pubid = pubid;
- di = dth.getDestInfo();
- subid = di.getSubId();
- feedid = di.getLogData();
- spool = di.getSpool();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ subid = destInfo.getSubId();
+ this.followRedirects = destInfo.isFollowRedirects();
+ feedid = destInfo.getLogData();
+ spool = destInfo.getSpool();
String dfn = spool + "/" + pubid;
String mfn = dfn + ".M";
datafile = new File(spool + "/" + pubid);
metafile = new File(mfn);
- boolean monly = di.isMetaDataOnly();
+ boolean monly = destInfo.isMetaDataOnly();
date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
- Vector<String[]> hdrv = new Vector<String[]>();
+ resumeTime = System.currentTimeMillis();
+ Vector<String[]> hdrv = new Vector<>();
try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
String s = br.readLine();
int i = s.indexOf('\t');
method = s.substring(0, i);
+ NodeUtils.setIpAndFqdnForEelf(method);
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
i = s.indexOf('\t');
String h = s.substring(0, i);
String v = s.substring(i + 1);
- if ("x-att-dr-routing".equalsIgnoreCase(h)) {
+ if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
subid = v.replaceAll("[^ ]*/", "");
- feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+ feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
}
if (length == 0 && h.toLowerCase().startsWith("content-")) {
continue;
if (h.equalsIgnoreCase("content-type")) {
ctype = v;
}
+ if (h.equalsIgnoreCase("x-onap-requestid")) {
+ MDC.put(MDC_KEY_REQUEST_ID, v);
+ }
+ if (h.equalsIgnoreCase("x-invocationid")) {
+ MDC.put("InvocationId", v);
+ v = UUID.randomUUID().toString();
+ newInvocationId = v;
+ }
hdrv.add(new String[]{h, v});
}
} catch (Exception e) {
- loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+ loggerDeliveryTask.error("Exception "+ Arrays.toString(e.getStackTrace()), e);
}
hdrs = hdrv.toArray(new String[hdrv.size()][]);
- url = dth.getDestURL(fileid);
+ url = deliveryTaskHelper.getDestURL(fileid);
}
+
/**
* Is the object a DeliveryTask with the same publication ID?
*/
public String toString() {
return (pubid);
}
+
/**
* Get the publish ID
*/
- public String getPublishId() {
+ String getPublishId() {
return (pubid);
}
public void run() {
attempts++;
try {
- di = dth.getDestInfo();
- boolean expect100 = di.isUsing100();
- boolean monly = di.isMetaDataOnly();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ boolean expect100 = destInfo.isUsing100();
+ boolean monly = destInfo.isMetaDataOnly();
length = 0;
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- url = dth.getDestURL(fileid);
+ if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
+ fileid = fileid.replace(".gz", "");
+ }
+ url = deliveryTaskHelper.getDestURL(fileid);
URL u = new URL(url);
HttpURLConnection uc = (HttpURLConnection) u.openConnection();
uc.setConnectTimeout(60000);
uc.setInstanceFollowRedirects(false);
uc.setRequestMethod(method);
uc.setRequestProperty("Content-Length", Long.toString(length));
- uc.setRequestProperty("Authorization", di.getAuth());
- uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
+ uc.setRequestProperty("Authorization", destInfo.getAuth());
+ uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
for (String[] nv : hdrs) {
uc.addRequestProperty(nv[0], nv[1]);
}
if (expect100) {
uc.setRequestProperty("Expect", "100-continue");
}
- uc.setFixedLengthStreamingMode(length);
uc.setDoOutput(true);
- OutputStream os = null;
- try {
- os = uc.getOutputStream();
- } catch (ProtocolException pe) {
- dth.reportDeliveryExtra(this, -1L);
- // Rcvd error instead of 100-continue
- loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
- }
- if (os != null) {
- long sofar = 0;
- try (InputStream is = new FileInputStream(datafile)) {
- byte[] buf = new byte[1024 * 1024];
- while (sofar < length) {
- int i = buf.length;
- if (sofar + i > length) {
- i = (int) (length - sofar);
- }
- i = is.read(buf, 0, i);
- if (i <= 0) {
- throw new IOException("Unexpected problem reading data file " + datafile);
- }
- sofar += i;
- os.write(buf, 0, i);
- }
- os.close();
- } catch (IOException ioe) {
- dth.reportDeliveryExtra(this, sofar);
- throw ioe;
+ if (destInfo.isDecompress()) {
+ if (isFiletypeGzip(datafile)) {
+ sendDecompressedFile(uc);
+ } else {
+ uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
+ sendFile(uc);
}
+ } else {
+ sendFile(uc);
}
}
int rc = uc.getResponseCode();
InputStream is;
if (rc >= 200 && rc <= 299) {
is = uc.getInputStream();
- xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
+ xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
} else {
if (rc >= 300 && rc <= 399) {
rmsg = uc.getHeaderField("Location");
}
is.close();
}
- dth.reportStatus(this, rc, xpubid, rmsg);
+ deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
- loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
- dth.reportException(this, e);
+ loggerDeliveryTask.error("Exception " + Arrays.toString(e.getStackTrace()), e);
+ deliveryTaskHelper.reportException(this, e);
+ }
+ }
+
+ /**
+ * To send decompressed gzip to the subscribers
+ *
+ * @param httpURLConnection connection used to make request
+ * @throws IOException
+ */
+ private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
+ byte[] buffer = new byte[8164];
+ httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+ OutputStream outputStream = getOutputStream(httpURLConnection);
+ if (outputStream != null) {
+ int bytesRead = 0;
+ try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
+ int bufferLength = buffer.length;
+ while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ outputStream.close();
+ } catch (IOException e) {
+ httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+ loggerDeliveryTask.info("Could not decompress file");
+ sendFile(httpURLConnection);
+ }
+
+ }
+ }
+
+ /**
+ * To send any file to the subscriber.
+ *
+ * @param httpURLConnection connection used to make request
+ * @throws IOException
+ */
+ private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream os = getOutputStream(httpURLConnection);
+ if (os != null) {
+ long sofar = 0;
+ try (InputStream is = new FileInputStream(datafile)) {
+ byte[] buf = new byte[1024 * 1024];
+ while (sofar < length) {
+ int i = buf.length;
+ if (sofar + i > length) {
+ i = (int) (length - sofar);
+ }
+ i = is.read(buf, 0, i);
+ if (i <= 0) {
+ throw new IOException("Unexpected problem reading data file " + datafile);
+ }
+ sofar += i;
+ os.write(buf, 0, i);
+ }
+ os.close();
+ } catch (IOException ioe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Get the outputstream that will be used to send data
+ *
+ * @param httpURLConnection connection used to make request
+ * @return AN Outpustream that can be used to send your data.
+ * @throws IOException
+ */
+ private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream outputStream = null;
+
+ try {
+ outputStream = httpURLConnection.getOutputStream();
+ } catch (ProtocolException pe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, -1L);
+ // Rcvd error instead of 100-continue
+ loggerDeliveryTask.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
}
+ return outputStream;
}
/**
* Remove meta and data files
*/
- public void clean() {
+ void clean() {
datafile.delete();
metafile.delete();
+ eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
+ eelflogger.info(EelfMsgs.EXIT);
hdrs = null;
}
+ /**
+ * Set the resume time for a delivery task.
+ */
+ void setResumeTime(long resumeTime) {
+ this.resumeTime = resumeTime;
+ }
+
+ /**
+ * Get the resume time for a delivery task.
+ */
+ long getResumeTime() {
+ return resumeTime;
+ }
+
/**
* Has this delivery task been cleaned?
*/
- public boolean isCleaned() {
+ boolean isCleaned() {
return (hdrs == null);
}
/**
* Get creation date as encoded in the publish ID.
*/
- public long getDate() {
+ long getDate() {
return (date);
}
/**
* Get the content type
*/
- public String getCType() {
+ String getCType() {
return (ctype);
}
/**
* Get the method
*/
- public String getMethod() {
+ String getMethod() {
return (method);
}
/**
* Get the file ID
*/
- public String getFileId() {
+ String getFileId() {
return (fileid);
}
/**
* Get the number of delivery attempts
*/
- public int getAttempts() {
+ int getAttempts() {
return (attempts);
}
/**
* Get the (space delimited list of) subscription ID for this delivery task
*/
- public String getSubId() {
+ String getSubId() {
return (subid);
}
/**
* Get the feed ID for this delivery task
*/
- public String getFeedId() {
+ String getFeedId() {
return (feedid);
}
+
+ /**
+ * Get the followRedirects for this delivery task
+ */
+ public boolean getFollowRedirects() {
+ return(followRedirects);
+ }
}