X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdatarouter.git;a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FDeliveryTask.java;h=2093d6d40d401cdb4cd549a70aaaec9ef5f1e167;hp=c07822d2766b1ce8ea39bfd6b8050643f2d9c428;hb=0a440fd3ae3b413cd7de57677aec690f14ec7d53;hpb=e4b20cc6f7c31f48ddd0de5bcd054b09a35cd510 diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index c07822d2..2093d6d4 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -1,308 +1,455 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package org.onap.dmaap.datarouter.node; - -import java.io.*; -import java.net.*; -import java.util.*; -import org.apache.log4j.Logger; - -/** - * A file to be delivered to a destination. - *

- * A Delivery task represents a work item for the data router - a file that - * needs to be delivered and provides mechanisms to get information about - * the file and its delivery data as well as to attempt delivery. - */ -public class DeliveryTask implements Runnable, Comparable { - private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); - private DeliveryTaskHelper dth; - private String pubid; - private DestInfo di; - private String spool; - private File datafile; - private File metafile; - private long length; - private long date; - private String method; - private String fileid; - private String ctype; - private String url; - private String feedid; - private String subid; - private int attempts; - private String[][] hdrs; - /** - * Is the object a DeliveryTask with the same publication ID? - */ - public boolean equals(Object o) { - if (!(o instanceof DeliveryTask)) { - return(false); - } - return(pubid.equals(((DeliveryTask)o).pubid)); - } - /** - * Compare the publication IDs. - */ - public int compareTo(DeliveryTask o) { - return(pubid.compareTo(o.pubid)); - } - /** - * Get the hash code of the publication ID. - */ - public int hashCode() { - return(pubid.hashCode()); - } - /** - * Return the publication ID. - */ - public String toString() { - return(pubid); - } - /** - * Create a delivery task for a given delivery queue and pub ID - * @param dth The delivery task helper for the queue this task is in. - * @param pubid The publish ID for this file. This is used as - * the base for the file name in the spool directory and is of - * the form . - */ - public DeliveryTask(DeliveryTaskHelper dth, String pubid) { - this.dth = dth; - this.pubid = pubid; - di = dth.getDestInfo(); - subid = di.getSubId(); - feedid = di.getLogData(); - spool = di.getSpool(); - String dfn = spool + "/" + pubid; - String mfn = dfn + ".M"; - datafile = new File(spool + "/" + pubid); - metafile = new File(mfn); - boolean monly = di.isMetaDataOnly(); - date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); - Vector hdrv = new Vector(); - try { - BufferedReader br = new BufferedReader(new FileReader(metafile)); - String s = br.readLine(); - int i = s.indexOf('\t'); - method = s.substring(0, i); - if (!"DELETE".equals(method) && !monly) { - length = datafile.length(); - } - fileid = s.substring(i + 1); - while ((s = br.readLine()) != null) { - i = s.indexOf('\t'); - String h = s.substring(0, i); - String v = s.substring(i + 1); - if ("x-att-dr-routing".equalsIgnoreCase(h)) { - subid = v.replaceAll("[^ ]*/", ""); - feedid = dth.getFeedId(subid.replaceAll(" .*", "")); - } - if (length == 0 && h.toLowerCase().startsWith("content-")) { - continue; - } - if (h.equalsIgnoreCase("content-type")) { - ctype = v; - } - hdrv.add(new String[] {h, v}); - } - br.close(); - } catch (Exception e) { - } - hdrs = hdrv.toArray(new String[hdrv.size()][]); - url = dth.getDestURL(fileid); - } - /** - * Get the publish ID - */ - public String getPublishId() { - return(pubid); - } - /** - * Attempt delivery - */ - public void run() { - attempts++; - try { - di = dth.getDestInfo(); - boolean expect100 = di.isUsing100(); - boolean monly = di.isMetaDataOnly(); - length = 0; - if (!"DELETE".equals(method) && !monly) { - length = datafile.length(); - } - url = dth.getDestURL(fileid); - URL u = new URL(url); - HttpURLConnection uc = (HttpURLConnection)u.openConnection(); - uc.setConnectTimeout(60000); - uc.setReadTimeout(60000); - uc.setInstanceFollowRedirects(false); - uc.setRequestMethod(method); - uc.setRequestProperty("Content-Length", Long.toString(length)); - uc.setRequestProperty("Authorization", di.getAuth()); - uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid); - for (String[] nv: hdrs) { - uc.addRequestProperty(nv[0], nv[1]); - } - if (length > 0) { - if (expect100) { - uc.setRequestProperty("Expect", "100-continue"); - } - uc.setFixedLengthStreamingMode(length); - uc.setDoOutput(true); - OutputStream os = null; - try { - os = uc.getOutputStream(); - } catch (ProtocolException pe) { - dth.reportDeliveryExtra(this, -1L); - // Rcvd error instead of 100-continue - } - if (os != null) { - long sofar = 0; - try { - byte[] buf = new byte[1024 * 1024]; - InputStream is = new FileInputStream(datafile); - while (sofar < length) { - int i = buf.length; - if (sofar + i > length) { - i = (int)(length - sofar); - } - i = is.read(buf, 0, i); - if (i <= 0) { - throw new IOException("Unexpected problem reading data file " + datafile); - } - sofar += i; - os.write(buf, 0, i); - } - is.close(); - os.close(); - } catch (IOException ioe) { - dth.reportDeliveryExtra(this, sofar); - throw ioe; - } - } - } - int rc = uc.getResponseCode(); - String rmsg = uc.getResponseMessage(); - if (rmsg == null) { - String h0 = uc.getHeaderField(0); - if (h0 != null) { - int i = h0.indexOf(' '); - int j = h0.indexOf(' ', i + 1); - if (i != -1 && j != -1) { - rmsg = h0.substring(j + 1); - } - } - } - String xpubid = null; - InputStream is; - if (rc >= 200 && rc <= 299) { - is = uc.getInputStream(); - xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID"); - } else { - if (rc >= 300 && rc <= 399) { - rmsg = uc.getHeaderField("Location"); - } - is = uc.getErrorStream(); - } - byte[] buf = new byte[4096]; - if (is != null) { - while (is.read(buf) > 0) { - } - is.close(); - } - dth.reportStatus(this, rc, xpubid, rmsg); - } catch (Exception e) { - dth.reportException(this, e); - } - } - /** - * Remove meta and data files - */ - public void clean() { - datafile.delete(); - metafile.delete(); - hdrs = null; - } - /** - * Has this delivery task been cleaned? - */ - public boolean isCleaned() { - return(hdrs == null); - } - /** - * Get length of body - */ - public long getLength() { - return(length); - } - /** - * Get creation date as encoded in the publish ID. - */ - public long getDate() { - return(date); - } - /** - * Get the most recent delivery attempt URL - */ - public String getURL() { - return(url); - } - /** - * Get the content type - */ - public String getCType() { - return(ctype); - } - /** - * Get the method - */ - public String getMethod() { - return(method); - } - /** - * Get the file ID - */ - public String getFileId() { - return(fileid); - } - /** - * Get the number of delivery attempts - */ - public int getAttempts() { - return(attempts); - } - /** - * Get the (space delimited list of) subscription ID for this delivery task - */ - public String getSubId() { - return(subid); - } - /** - * Get the feed ID for this delivery task - */ - public String getFeedId() { - return(feedid); - } -} +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.node; + +import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; +import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.ProtocolException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.UUID; +import java.util.zip.GZIPInputStream; +import org.jetbrains.annotations.Nullable; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; +import org.slf4j.MDC; + +/** + * A file to be delivered to a destination. + * + *

A Delivery task represents a work item for the data router - a file that needs to be delivered and provides + * mechanisms to get information about the file and its delivery data as well as to attempt delivery. + */ +public class DeliveryTask implements Runnable, Comparable { + + private static final String DECOMPRESSION_STATUS = "Decompression_Status"; + private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class); + private DeliveryTaskHelper deliveryTaskHelper; + private String pubid; + private DestInfo destInfo; + private String spool; + private File datafile; + private File metafile; + private long length; + private long date; + private String method; + private String fileid; + private String ctype; + private String url; + private String feedid; + private String subid; + private int attempts; + private boolean followRedirects; + private String[][] hdrs; + private String newInvocationId; + private long resumeTime; + + + /** + * Create a delivery task for a given delivery queue and pub ID + * + * @param deliveryTaskHelper The delivery task helper for the queue this task is in. + * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and + * is of the form . + */ + DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { + this.deliveryTaskHelper = deliveryTaskHelper; + this.pubid = pubid; + destInfo = deliveryTaskHelper.getDestinationInfo(); + subid = destInfo.getSubId(); + this.followRedirects = destInfo.isFollowRedirects(); + feedid = destInfo.getLogData(); + spool = destInfo.getSpool(); + String dfn = spool + "/" + pubid; + String mfn = dfn + ".M"; + datafile = new File(spool + "/" + pubid); + metafile = new File(mfn); + boolean monly = destInfo.isMetaDataOnly(); + date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); + resumeTime = System.currentTimeMillis(); + ArrayList hdrv = new ArrayList<>(); + + try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { + String line = br.readLine(); + int index = line.indexOf('\t'); + method = line.substring(0, index); + NodeUtils.setIpAndFqdnForEelf(method); + if (!"DELETE".equals(method) && !monly) { + length = datafile.length(); + } + fileid = line.substring(index + 1); + while ((line = br.readLine()) != null) { + index = line.indexOf('\t'); + String header = line.substring(0, index); + String headerValue = line.substring(index + 1); + if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) { + subid = headerValue.replaceAll("[^ ]*/", ""); + feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", "")); + } + if (length == 0 && header.toLowerCase().startsWith("content-")) { + continue; + } + if ("content-type".equalsIgnoreCase(header)) { + ctype = headerValue; + } + if ("x-onap-requestid".equalsIgnoreCase(header)) { + MDC.put(MDC_KEY_REQUEST_ID, headerValue); + } + if ("x-invocationid".equalsIgnoreCase(header)) { + MDC.put("InvocationId", headerValue); + headerValue = UUID.randomUUID().toString(); + newInvocationId = headerValue; + } + hdrv.add(new String[]{header, headerValue}); + } + } catch (Exception e) { + eelfLogger.error("Exception", e); + } + hdrs = hdrv.toArray(new String[hdrv.size()][]); + url = deliveryTaskHelper.getDestURL(fileid); + } + + /** + * Is the object a DeliveryTask with the same publication ID. + */ + public boolean equals(Object object) { + if (!(object instanceof DeliveryTask)) { + return (false); + } + return (pubid.equals(((DeliveryTask) object).pubid)); + } + + /** + * Compare the publication IDs. + */ + public int compareTo(DeliveryTask other) { + return (pubid.compareTo(other.pubid)); + } + + /** + * Get the hash code of the publication ID. + */ + public int hashCode() { + return (pubid.hashCode()); + } + + /** + * Return the publication ID. + */ + public String toString() { + return (pubid); + } + + /** + * Get the publish ID. + */ + String getPublishId() { + return (pubid); + } + + /** + * Attempt delivery. + */ + public void run() { + attempts++; + try { + destInfo = deliveryTaskHelper.getDestinationInfo(); + boolean monly = destInfo.isMetaDataOnly(); + length = 0; + if (!"DELETE".equals(method) && !monly) { + length = datafile.length(); + } + stripSuffixIfIsDecompress(); + url = deliveryTaskHelper.getDestURL(fileid); + URL urlObj = new URL(url); + HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection(); + urlConnection.setConnectTimeout(60000); + urlConnection.setReadTimeout(60000); + urlConnection.setInstanceFollowRedirects(false); + urlConnection.setRequestMethod(method); + urlConnection.setRequestProperty("Content-Length", Long.toString(length)); + urlConnection.setRequestProperty("Authorization", destInfo.getAuth()); + urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); + boolean expect100 = destInfo.isUsing100(); + int rc = deliverFileToSubscriber(expect100, urlConnection); + String rmsg = urlConnection.getResponseMessage(); + rmsg = getResponseMessage(urlConnection, rmsg); + String xpubid = null; + InputStream is; + if (rc >= 200 && rc <= 299) { + is = urlConnection.getInputStream(); + xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); + } else { + if (rc >= 300 && rc <= 399) { + rmsg = urlConnection.getHeaderField("Location"); + } + is = urlConnection.getErrorStream(); + } + byte[] buf = new byte[4096]; + if (is != null) { + while (is.read(buf) > 0) { + } + is.close(); + } + deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); + } catch (Exception e) { + eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e); + deliveryTaskHelper.reportException(this, e); + } + } + + /** + * To send decompressed gzip to the subscribers. + * + * @param httpURLConnection connection used to make request + */ + private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException { + byte[] buffer = new byte[8164]; + httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS"); + OutputStream outputStream = getOutputStream(httpURLConnection); + if (outputStream != null) { + int bytesRead = 0; + try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) { + int bufferLength = buffer.length; + while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) { + outputStream.write(buffer, 0, bytesRead); + } + outputStream.close(); + } catch (IOException e) { + httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE"); + eelfLogger.info("Could not decompress file", e); + sendFile(httpURLConnection); + } + + } + } + + /** + * To send any file to the subscriber. + * + * @param httpURLConnection connection used to make request + */ + private void sendFile(HttpURLConnection httpURLConnection) throws IOException { + OutputStream os = getOutputStream(httpURLConnection); + if (os == null) { + return; + } + long sofar = 0; + try (InputStream is = new FileInputStream(datafile)) { + byte[] buf = new byte[1024 * 1024]; + while (sofar < length) { + int len = buf.length; + if (sofar + len > length) { + len = (int) (length - sofar); + } + len = is.read(buf, 0, len); + if (len <= 0) { + throw new IOException("Unexpected problem reading data file " + datafile); + } + sofar += len; + os.write(buf, 0, len); + } + os.close(); + } catch (IOException ioe) { + deliveryTaskHelper.reportDeliveryExtra(this, sofar); + throw ioe; + } + } + + /** + * Get the outputstream that will be used to send data. + * + * @param httpURLConnection connection used to make request + * @return AN Outpustream that can be used to send your data. + */ + private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException { + OutputStream outputStream = null; + try { + outputStream = httpURLConnection.getOutputStream(); + } catch (ProtocolException pe) { + deliveryTaskHelper.reportDeliveryExtra(this, -1L); + // Rcvd error instead of 100-continue + eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe); + } + return outputStream; + } + + private void stripSuffixIfIsDecompress() { + if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) { + fileid = fileid.replace(".gz", ""); + } + } + + private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException { + for (String[] nv : hdrs) { + uc.addRequestProperty(nv[0], nv[1]); + } + if (length > 0) { + if (expect100) { + uc.setRequestProperty("Expect", "100-continue"); + } + uc.setDoOutput(true); + if (destInfo.isDecompress()) { + if (isFiletypeGzip(datafile)) { + sendDecompressedFile(uc); + } else { + uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT"); + sendFile(uc); + } + } else { + sendFile(uc); + } + } + return uc.getResponseCode(); + } + + @Nullable + private String getResponseMessage(HttpURLConnection uc, String rmsg) { + if (rmsg == null) { + String h0 = uc.getHeaderField(0); + if (h0 != null) { + int indexOfSpace1 = h0.indexOf(' '); + int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1); + if (indexOfSpace1 != -1 && indexOfSpace2 != -1) { + rmsg = h0.substring(indexOfSpace2 + 1); + } + } + } + return rmsg; + } + + /** + * Remove meta and data files. + */ + void clean() { + datafile.delete(); + metafile.delete(); + eelfLogger.info(EelfMsgs.INVOKE, newInvocationId); + eelfLogger.info(EelfMsgs.EXIT); + hdrs = null; + } + + /** + * Get the resume time for a delivery task. + */ + long getResumeTime() { + return resumeTime; + } + + /** + * Set the resume time for a delivery task. + */ + void setResumeTime(long resumeTime) { + this.resumeTime = resumeTime; + } + + /** + * Has this delivery task been cleaned. + */ + boolean isCleaned() { + return (hdrs == null); + } + + /** + * Get length of body. + */ + public long getLength() { + return (length); + } + + /** + * Get creation date as encoded in the publish ID. + */ + long getDate() { + return (date); + } + + /** + * Get the most recent delivery attempt URL. + */ + public String getURL() { + return (url); + } + + /** + * Get the content type. + */ + String getCType() { + return (ctype); + } + + /** + * Get the method. + */ + String getMethod() { + return (method); + } + + /** + * Get the file ID. + */ + String getFileId() { + return (fileid); + } + + /** + * Get the number of delivery attempts. + */ + int getAttempts() { + return (attempts); + } + + /** + * Get the (space delimited list of) subscription ID for this delivery task. + */ + String getSubId() { + return (subid); + } + + /** + * Get the feed ID for this delivery task. + */ + String getFeedId() { + return (feedid); + } + + /** + * Get the followRedirects for this delivery task. + */ + public boolean getFollowRedirects() { + return (followRedirects); + } +}