X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FDeliveryTask.java;h=bbd1391273ce920565d5f9e81d1089ae14d369d2;hb=cf55456bbe6ce4da723f9dfa64b573f02908ab24;hp=c07822d2766b1ce8ea39bfd6b8050643f2d9c428;hpb=e4b20cc6f7c31f48ddd0de5bcd054b09a35cd510;p=dmaap%2Fdatarouter.git
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
index c07822d2..bbd13912 100644
--- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
+++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
@@ -1,308 +1,455 @@
-/*******************************************************************************
- * ============LICENSE_START==================================================
- * * org.onap.dmaap
- * * ===========================================================================
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * * ===========================================================================
- * * Licensed under the Apache License, Version 2.0 (the "License");
- * * you may not use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- * * ============LICENSE_END====================================================
- * *
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * *
- ******************************************************************************/
-
-
-package org.onap.dmaap.datarouter.node;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import org.apache.log4j.Logger;
-
-/**
- * A file to be delivered to a destination.
- *
- * A Delivery task represents a work item for the data router - a file that
- * needs to be delivered and provides mechanisms to get information about
- * the file and its delivery data as well as to attempt delivery.
- */
-public class DeliveryTask implements Runnable, Comparable {
- private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
- private DeliveryTaskHelper dth;
- private String pubid;
- private DestInfo di;
- private String spool;
- private File datafile;
- private File metafile;
- private long length;
- private long date;
- private String method;
- private String fileid;
- private String ctype;
- private String url;
- private String feedid;
- private String subid;
- private int attempts;
- private String[][] hdrs;
- /**
- * Is the object a DeliveryTask with the same publication ID?
- */
- public boolean equals(Object o) {
- if (!(o instanceof DeliveryTask)) {
- return(false);
- }
- return(pubid.equals(((DeliveryTask)o).pubid));
- }
- /**
- * Compare the publication IDs.
- */
- public int compareTo(DeliveryTask o) {
- return(pubid.compareTo(o.pubid));
- }
- /**
- * Get the hash code of the publication ID.
- */
- public int hashCode() {
- return(pubid.hashCode());
- }
- /**
- * Return the publication ID.
- */
- public String toString() {
- return(pubid);
- }
- /**
- * Create a delivery task for a given delivery queue and pub ID
- * @param dth The delivery task helper for the queue this task is in.
- * @param pubid The publish ID for this file. This is used as
- * the base for the file name in the spool directory and is of
- * the form .
- */
- public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
- this.dth = dth;
- this.pubid = pubid;
- di = dth.getDestInfo();
- subid = di.getSubId();
- feedid = di.getLogData();
- spool = di.getSpool();
- String dfn = spool + "/" + pubid;
- String mfn = dfn + ".M";
- datafile = new File(spool + "/" + pubid);
- metafile = new File(mfn);
- boolean monly = di.isMetaDataOnly();
- date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
- Vector hdrv = new Vector();
- try {
- BufferedReader br = new BufferedReader(new FileReader(metafile));
- String s = br.readLine();
- int i = s.indexOf('\t');
- method = s.substring(0, i);
- if (!"DELETE".equals(method) && !monly) {
- length = datafile.length();
- }
- fileid = s.substring(i + 1);
- while ((s = br.readLine()) != null) {
- i = s.indexOf('\t');
- String h = s.substring(0, i);
- String v = s.substring(i + 1);
- if ("x-att-dr-routing".equalsIgnoreCase(h)) {
- subid = v.replaceAll("[^ ]*/", "");
- feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
- }
- if (length == 0 && h.toLowerCase().startsWith("content-")) {
- continue;
- }
- if (h.equalsIgnoreCase("content-type")) {
- ctype = v;
- }
- hdrv.add(new String[] {h, v});
- }
- br.close();
- } catch (Exception e) {
- }
- hdrs = hdrv.toArray(new String[hdrv.size()][]);
- url = dth.getDestURL(fileid);
- }
- /**
- * Get the publish ID
- */
- public String getPublishId() {
- return(pubid);
- }
- /**
- * Attempt delivery
- */
- public void run() {
- attempts++;
- try {
- di = dth.getDestInfo();
- boolean expect100 = di.isUsing100();
- boolean monly = di.isMetaDataOnly();
- length = 0;
- if (!"DELETE".equals(method) && !monly) {
- length = datafile.length();
- }
- url = dth.getDestURL(fileid);
- URL u = new URL(url);
- HttpURLConnection uc = (HttpURLConnection)u.openConnection();
- uc.setConnectTimeout(60000);
- uc.setReadTimeout(60000);
- uc.setInstanceFollowRedirects(false);
- uc.setRequestMethod(method);
- uc.setRequestProperty("Content-Length", Long.toString(length));
- uc.setRequestProperty("Authorization", di.getAuth());
- uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);
- for (String[] nv: hdrs) {
- uc.addRequestProperty(nv[0], nv[1]);
- }
- if (length > 0) {
- if (expect100) {
- uc.setRequestProperty("Expect", "100-continue");
- }
- uc.setFixedLengthStreamingMode(length);
- uc.setDoOutput(true);
- OutputStream os = null;
- try {
- os = uc.getOutputStream();
- } catch (ProtocolException pe) {
- dth.reportDeliveryExtra(this, -1L);
- // Rcvd error instead of 100-continue
- }
- if (os != null) {
- long sofar = 0;
- try {
- byte[] buf = new byte[1024 * 1024];
- InputStream is = new FileInputStream(datafile);
- while (sofar < length) {
- int i = buf.length;
- if (sofar + i > length) {
- i = (int)(length - sofar);
- }
- i = is.read(buf, 0, i);
- if (i <= 0) {
- throw new IOException("Unexpected problem reading data file " + datafile);
- }
- sofar += i;
- os.write(buf, 0, i);
- }
- is.close();
- os.close();
- } catch (IOException ioe) {
- dth.reportDeliveryExtra(this, sofar);
- throw ioe;
- }
- }
- }
- int rc = uc.getResponseCode();
- String rmsg = uc.getResponseMessage();
- if (rmsg == null) {
- String h0 = uc.getHeaderField(0);
- if (h0 != null) {
- int i = h0.indexOf(' ');
- int j = h0.indexOf(' ', i + 1);
- if (i != -1 && j != -1) {
- rmsg = h0.substring(j + 1);
- }
- }
- }
- String xpubid = null;
- InputStream is;
- if (rc >= 200 && rc <= 299) {
- is = uc.getInputStream();
- xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");
- } else {
- if (rc >= 300 && rc <= 399) {
- rmsg = uc.getHeaderField("Location");
- }
- is = uc.getErrorStream();
- }
- byte[] buf = new byte[4096];
- if (is != null) {
- while (is.read(buf) > 0) {
- }
- is.close();
- }
- dth.reportStatus(this, rc, xpubid, rmsg);
- } catch (Exception e) {
- dth.reportException(this, e);
- }
- }
- /**
- * Remove meta and data files
- */
- public void clean() {
- datafile.delete();
- metafile.delete();
- hdrs = null;
- }
- /**
- * Has this delivery task been cleaned?
- */
- public boolean isCleaned() {
- return(hdrs == null);
- }
- /**
- * Get length of body
- */
- public long getLength() {
- return(length);
- }
- /**
- * Get creation date as encoded in the publish ID.
- */
- public long getDate() {
- return(date);
- }
- /**
- * Get the most recent delivery attempt URL
- */
- public String getURL() {
- return(url);
- }
- /**
- * Get the content type
- */
- public String getCType() {
- return(ctype);
- }
- /**
- * Get the method
- */
- public String getMethod() {
- return(method);
- }
- /**
- * Get the file ID
- */
- public String getFileId() {
- return(fileid);
- }
- /**
- * Get the number of delivery attempts
- */
- public int getAttempts() {
- return(attempts);
- }
- /**
- * Get the (space delimited list of) subscription ID for this delivery task
- */
- public String getSubId() {
- return(subid);
- }
- /**
- * Get the feed ID for this delivery task
- */
- public String getFeedId() {
- return(feedid);
- }
-}
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+package org.onap.dmaap.datarouter.node;
+
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.zip.GZIPInputStream;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
+import org.slf4j.MDC;
+
+/**
+ * A file to be delivered to a destination.
+ *
+ * A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
+ * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
+ */
+public class DeliveryTask implements Runnable, Comparable {
+
+ private static final String DECOMPRESSION_STATUS = "Decompression_Status";
+ private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
+ private DeliveryTaskHelper deliveryTaskHelper;
+ private String pubid;
+ private DestInfo destInfo;
+ private String spool;
+ private File datafile;
+ private File metafile;
+ private long length;
+ private long date;
+ private String method;
+ private String fileid;
+ private String ctype;
+ private String url;
+ private String feedid;
+ private String subid;
+ private int attempts;
+ private boolean followRedirects;
+ private String[][] hdrs;
+ private String newInvocationId;
+ private long resumeTime;
+
+
+ /**
+ * Create a delivery task for a given delivery queue and pub ID.
+ *
+ * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
+ * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and
+ * is of the form (milliseconds since 1970).(fqdn of initial data router node)
+ */
+ DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+ this.deliveryTaskHelper = deliveryTaskHelper;
+ this.pubid = pubid;
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ subid = destInfo.getSubId();
+ this.followRedirects = destInfo.isFollowRedirects();
+ feedid = destInfo.getLogData();
+ spool = destInfo.getSpool();
+ String dfn = spool + "/" + pubid;
+ String mfn = dfn + ".M";
+ datafile = new File(spool + "/" + pubid);
+ metafile = new File(mfn);
+ boolean monly = destInfo.isMetaDataOnly();
+ date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
+ resumeTime = System.currentTimeMillis();
+ ArrayList hdrv = new ArrayList<>();
+
+ try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
+ String line = br.readLine();
+ int index = line.indexOf('\t');
+ method = line.substring(0, index);
+ NodeUtils.setIpAndFqdnForEelf(method);
+ if (!"DELETE".equals(method) && !monly) {
+ length = datafile.length();
+ }
+ fileid = line.substring(index + 1);
+ while ((line = br.readLine()) != null) {
+ index = line.indexOf('\t');
+ String header = line.substring(0, index);
+ String headerValue = line.substring(index + 1);
+ if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
+ subid = headerValue.replaceAll("[^ ]*/", "");
+ feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
+ }
+ if (length == 0 && header.toLowerCase().startsWith("content-")) {
+ continue;
+ }
+ if ("content-type".equalsIgnoreCase(header)) {
+ ctype = headerValue;
+ }
+ if ("x-onap-requestid".equalsIgnoreCase(header)) {
+ MDC.put(MDC_KEY_REQUEST_ID, headerValue);
+ }
+ if ("x-invocationid".equalsIgnoreCase(header)) {
+ MDC.put("InvocationId", headerValue);
+ headerValue = UUID.randomUUID().toString();
+ newInvocationId = headerValue;
+ }
+ hdrv.add(new String[]{header, headerValue});
+ }
+ } catch (Exception e) {
+ eelfLogger.error("Exception", e);
+ }
+ hdrs = hdrv.toArray(new String[hdrv.size()][]);
+ url = deliveryTaskHelper.getDestURL(fileid);
+ }
+
+ /**
+ * Is the object a DeliveryTask with the same publication ID.
+ */
+ public boolean equals(Object object) {
+ if (!(object instanceof DeliveryTask)) {
+ return (false);
+ }
+ return (pubid.equals(((DeliveryTask) object).pubid));
+ }
+
+ /**
+ * Compare the publication IDs.
+ */
+ public int compareTo(DeliveryTask other) {
+ return (pubid.compareTo(other.pubid));
+ }
+
+ /**
+ * Get the hash code of the publication ID.
+ */
+ public int hashCode() {
+ return (pubid.hashCode());
+ }
+
+ /**
+ * Return the publication ID.
+ */
+ public String toString() {
+ return (pubid);
+ }
+
+ /**
+ * Get the publish ID.
+ */
+ String getPublishId() {
+ return (pubid);
+ }
+
+ /**
+ * Attempt delivery.
+ */
+ public void run() {
+ attempts++;
+ try {
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ boolean monly = destInfo.isMetaDataOnly();
+ length = 0;
+ if (!"DELETE".equals(method) && !monly) {
+ length = datafile.length();
+ }
+ stripSuffixIfIsDecompress();
+ url = deliveryTaskHelper.getDestURL(fileid);
+ URL urlObj = new URL(url);
+ HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
+ urlConnection.setConnectTimeout(60000);
+ urlConnection.setReadTimeout(60000);
+ urlConnection.setInstanceFollowRedirects(false);
+ urlConnection.setRequestMethod(method);
+ urlConnection.setRequestProperty("Content-Length", Long.toString(length));
+ urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
+ urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
+ boolean expect100 = destInfo.isUsing100();
+ int rc = deliverFileToSubscriber(expect100, urlConnection);
+ String rmsg = urlConnection.getResponseMessage();
+ rmsg = getResponseMessage(urlConnection, rmsg);
+ String xpubid = null;
+ InputStream is;
+ if (rc >= 200 && rc <= 299) {
+ is = urlConnection.getInputStream();
+ xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
+ } else {
+ if (rc >= 300 && rc <= 399) {
+ rmsg = urlConnection.getHeaderField("Location");
+ }
+ is = urlConnection.getErrorStream();
+ }
+ byte[] buf = new byte[4096];
+ if (is != null) {
+ while (is.read(buf) > 0) {
+ }
+ is.close();
+ }
+ deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
+ } catch (Exception e) {
+ eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
+ deliveryTaskHelper.reportException(this, e);
+ }
+ }
+
+ /**
+ * To send decompressed gzip to the subscribers.
+ *
+ * @param httpURLConnection connection used to make request
+ */
+ private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
+ byte[] buffer = new byte[8164];
+ httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
+ OutputStream outputStream = getOutputStream(httpURLConnection);
+ if (outputStream != null) {
+ int bytesRead = 0;
+ try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
+ int bufferLength = buffer.length;
+ while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ outputStream.close();
+ } catch (IOException e) {
+ httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
+ eelfLogger.info("Could not decompress file", e);
+ sendFile(httpURLConnection);
+ }
+
+ }
+ }
+
+ /**
+ * To send any file to the subscriber.
+ *
+ * @param httpURLConnection connection used to make request
+ */
+ private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream os = getOutputStream(httpURLConnection);
+ if (os == null) {
+ return;
+ }
+ long sofar = 0;
+ try (InputStream is = new FileInputStream(datafile)) {
+ byte[] buf = new byte[1024 * 1024];
+ while (sofar < length) {
+ int len = buf.length;
+ if (sofar + len > length) {
+ len = (int) (length - sofar);
+ }
+ len = is.read(buf, 0, len);
+ if (len <= 0) {
+ throw new IOException("Unexpected problem reading data file " + datafile);
+ }
+ sofar += len;
+ os.write(buf, 0, len);
+ }
+ os.close();
+ } catch (IOException ioe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+ throw ioe;
+ }
+ }
+
+ /**
+ * Get the outputstream that will be used to send data.
+ *
+ * @param httpURLConnection connection used to make request
+ * @return AN Outpustream that can be used to send your data.
+ */
+ private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream outputStream = null;
+ try {
+ outputStream = httpURLConnection.getOutputStream();
+ } catch (ProtocolException pe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, -1L);
+ // Rcvd error instead of 100-continue
+ eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
+ }
+ return outputStream;
+ }
+
+ private void stripSuffixIfIsDecompress() {
+ if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
+ fileid = fileid.replace(".gz", "");
+ }
+ }
+
+ private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
+ for (String[] nv : hdrs) {
+ uc.addRequestProperty(nv[0], nv[1]);
+ }
+ if (length > 0) {
+ if (expect100) {
+ uc.setRequestProperty("Expect", "100-continue");
+ }
+ uc.setDoOutput(true);
+ if (destInfo.isDecompress()) {
+ if (isFiletypeGzip(datafile)) {
+ sendDecompressedFile(uc);
+ } else {
+ uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
+ sendFile(uc);
+ }
+ } else {
+ sendFile(uc);
+ }
+ }
+ return uc.getResponseCode();
+ }
+
+ @Nullable
+ private String getResponseMessage(HttpURLConnection uc, String rmsg) {
+ if (rmsg == null) {
+ String h0 = uc.getHeaderField(0);
+ if (h0 != null) {
+ int indexOfSpace1 = h0.indexOf(' ');
+ int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
+ if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
+ rmsg = h0.substring(indexOfSpace2 + 1);
+ }
+ }
+ }
+ return rmsg;
+ }
+
+ /**
+ * Remove meta and data files.
+ */
+ void clean() {
+ datafile.delete();
+ metafile.delete();
+ eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
+ eelfLogger.info(EelfMsgs.EXIT);
+ hdrs = null;
+ }
+
+ /**
+ * Get the resume time for a delivery task.
+ */
+ long getResumeTime() {
+ return resumeTime;
+ }
+
+ /**
+ * Set the resume time for a delivery task.
+ */
+ void setResumeTime(long resumeTime) {
+ this.resumeTime = resumeTime;
+ }
+
+ /**
+ * Has this delivery task been cleaned.
+ */
+ boolean isCleaned() {
+ return (hdrs == null);
+ }
+
+ /**
+ * Get length of body.
+ */
+ public long getLength() {
+ return (length);
+ }
+
+ /**
+ * Get creation date as encoded in the publish ID.
+ */
+ long getDate() {
+ return (date);
+ }
+
+ /**
+ * Get the most recent delivery attempt URL.
+ */
+ public String getURL() {
+ return (url);
+ }
+
+ /**
+ * Get the content type.
+ */
+ String getCType() {
+ return (ctype);
+ }
+
+ /**
+ * Get the method.
+ */
+ String getMethod() {
+ return (method);
+ }
+
+ /**
+ * Get the file ID.
+ */
+ String getFileId() {
+ return (fileid);
+ }
+
+ /**
+ * Get the number of delivery attempts.
+ */
+ int getAttempts() {
+ return (attempts);
+ }
+
+ /**
+ * Get the (space delimited list of) subscription ID for this delivery task.
+ */
+ String getSubId() {
+ return (subid);
+ }
+
+ /**
+ * Get the feed ID for this delivery task.
+ */
+ String getFeedId() {
+ return (feedid);
+ }
+
+ /**
+ * Get the followRedirects for this delivery task.
+ */
+ public boolean getFollowRedirects() {
+ return (followRedirects);
+ }
+}