/******************************************************************************* * ============LICENSE_START================================================== * * org.onap.dmaap * * =========================================================================== * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * * =========================================================================== * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * * * * http://www.apache.org/licenses/LICENSE-2.0 * * * * Unless required by applicable law or agreed to in writing, software * * distributed under the License is distributed on an "AS IS" BASIS, * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * * See the License for the specific language governing permissions and * * limitations under the License. * * ============LICENSE_END==================================================== * * * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * * ******************************************************************************/ package org.onap.dmaap.datarouter.node; import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.ProtocolException; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.UUID; import java.util.zip.GZIPInputStream; import org.jetbrains.annotations.Nullable; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; /** * A file to be delivered to a destination. * *

A Delivery task represents a work item for the data router - a file that needs to be delivered and provides * mechanisms to get information about the file and its delivery data as well as to attempt delivery. */ public class DeliveryTask implements Runnable, Comparable { private static final String DECOMPRESSION_STATUS = "Decompression_Status"; private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class); private DeliveryTaskHelper deliveryTaskHelper; private String pubid; private DestInfo destInfo; private String spool; private File datafile; private File metafile; private long length; private long date; private String method; private String fileid; private String ctype; private String url; private String feedid; private String subid; private int attempts; private boolean followRedirects; private String[][] hdrs; private String newInvocationId; private long resumeTime; /** * Create a delivery task for a given delivery queue and pub ID. * * @param deliveryTaskHelper The delivery task helper for the queue this task is in. * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and * is of the form (milliseconds since 1970).(fqdn of initial data router node) */ DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { this.deliveryTaskHelper = deliveryTaskHelper; this.pubid = pubid; destInfo = deliveryTaskHelper.getDestinationInfo(); subid = destInfo.getSubId(); this.followRedirects = destInfo.isFollowRedirects(); feedid = destInfo.getLogData(); spool = destInfo.getSpool(); String dfn = spool + "/" + pubid; String mfn = dfn + ".M"; datafile = new File(spool + "/" + pubid); metafile = new File(mfn); boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); resumeTime = System.currentTimeMillis(); ArrayList hdrv = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { String line = br.readLine(); int index = line.indexOf('\t'); method = line.substring(0, index); NodeUtils.setIpAndFqdnForEelf(method); if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } fileid = line.substring(index + 1); while ((line = br.readLine()) != null) { index = line.indexOf('\t'); String header = line.substring(0, index); String headerValue = line.substring(index + 1); if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) { subid = headerValue.replaceAll("[^ ]*/", ""); feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", "")); } if (length == 0 && header.toLowerCase().startsWith("content-")) { continue; } if ("content-type".equalsIgnoreCase(header)) { ctype = headerValue; } if ("x-onap-requestid".equalsIgnoreCase(header)) { MDC.put(MDC_KEY_REQUEST_ID, headerValue); } if ("x-invocationid".equalsIgnoreCase(header)) { MDC.put("InvocationId", headerValue); headerValue = UUID.randomUUID().toString(); newInvocationId = headerValue; } hdrv.add(new String[]{header, headerValue}); } } catch (Exception e) { eelfLogger.error("Exception", e); } hdrs = hdrv.toArray(new String[hdrv.size()][]); url = deliveryTaskHelper.getDestURL(fileid); } /** * Is the object a DeliveryTask with the same publication ID. */ public boolean equals(Object object) { if (!(object instanceof DeliveryTask)) { return (false); } return (pubid.equals(((DeliveryTask) object).pubid)); } /** * Compare the publication IDs. */ public int compareTo(DeliveryTask other) { return (pubid.compareTo(other.pubid)); } /** * Get the hash code of the publication ID. */ public int hashCode() { return (pubid.hashCode()); } /** * Return the publication ID. */ public String toString() { return (pubid); } /** * Get the publish ID. */ String getPublishId() { return (pubid); } /** * Attempt delivery. */ public void run() { attempts++; try { destInfo = deliveryTaskHelper.getDestinationInfo(); boolean monly = destInfo.isMetaDataOnly(); length = 0; if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } stripSuffixIfIsDecompress(); url = deliveryTaskHelper.getDestURL(fileid); URL urlObj = new URL(url); HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection(); urlConnection.setConnectTimeout(60000); urlConnection.setReadTimeout(60000); urlConnection.setInstanceFollowRedirects(false); urlConnection.setRequestMethod(method); urlConnection.setRequestProperty("Content-Length", Long.toString(length)); urlConnection.setRequestProperty("Authorization", destInfo.getAuth()); urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); boolean expect100 = destInfo.isUsing100(); int rc = deliverFileToSubscriber(expect100, urlConnection); String rmsg = urlConnection.getResponseMessage(); rmsg = getResponseMessage(urlConnection, rmsg); String xpubid = null; InputStream is; if (rc >= 200 && rc <= 299) { is = urlConnection.getInputStream(); xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID"); } else { if (rc >= 300 && rc <= 399) { rmsg = urlConnection.getHeaderField("Location"); } is = urlConnection.getErrorStream(); } byte[] buf = new byte[4096]; if (is != null) { while (is.read(buf) > 0) { } is.close(); } deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e); deliveryTaskHelper.reportException(this, e); } } /** * To send decompressed gzip to the subscribers. * * @param httpURLConnection connection used to make request */ private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException { byte[] buffer = new byte[8164]; httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS"); OutputStream outputStream = getOutputStream(httpURLConnection); if (outputStream != null) { int bytesRead = 0; try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) { int bufferLength = buffer.length; while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) { outputStream.write(buffer, 0, bytesRead); } outputStream.close(); } catch (IOException e) { httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE"); eelfLogger.info("Could not decompress file", e); sendFile(httpURLConnection); } } } /** * To send any file to the subscriber. * * @param httpURLConnection connection used to make request */ private void sendFile(HttpURLConnection httpURLConnection) throws IOException { OutputStream os = getOutputStream(httpURLConnection); if (os == null) { return; } long sofar = 0; try (InputStream is = new FileInputStream(datafile)) { byte[] buf = new byte[1024 * 1024]; while (sofar < length) { int len = buf.length; if (sofar + len > length) { len = (int) (length - sofar); } len = is.read(buf, 0, len); if (len <= 0) { throw new IOException("Unexpected problem reading data file " + datafile); } sofar += len; os.write(buf, 0, len); } os.close(); } catch (IOException ioe) { deliveryTaskHelper.reportDeliveryExtra(this, sofar); throw ioe; } } /** * Get the outputstream that will be used to send data. * * @param httpURLConnection connection used to make request * @return AN Outpustream that can be used to send your data. */ private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException { OutputStream outputStream = null; try { outputStream = httpURLConnection.getOutputStream(); } catch (ProtocolException pe) { deliveryTaskHelper.reportDeliveryExtra(this, -1L); // Rcvd error instead of 100-continue eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe); } return outputStream; } private void stripSuffixIfIsDecompress() { if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) { fileid = fileid.replace(".gz", ""); } } private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException { for (String[] nv : hdrs) { uc.addRequestProperty(nv[0], nv[1]); } if (length > 0) { if (expect100) { uc.setRequestProperty("Expect", "100-continue"); } uc.setDoOutput(true); if (destInfo.isDecompress()) { if (isFiletypeGzip(datafile)) { sendDecompressedFile(uc); } else { uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT"); sendFile(uc); } } else { sendFile(uc); } } return uc.getResponseCode(); } @Nullable private String getResponseMessage(HttpURLConnection uc, String rmsg) { if (rmsg == null) { String h0 = uc.getHeaderField(0); if (h0 != null) { int indexOfSpace1 = h0.indexOf(' '); int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1); if (indexOfSpace1 != -1 && indexOfSpace2 != -1) { rmsg = h0.substring(indexOfSpace2 + 1); } } } return rmsg; } /** * Remove meta and data files. */ void clean() { datafile.delete(); metafile.delete(); eelfLogger.info(EelfMsgs.INVOKE, newInvocationId); eelfLogger.info(EelfMsgs.EXIT); hdrs = null; } /** * Get the resume time for a delivery task. */ long getResumeTime() { return resumeTime; } /** * Set the resume time for a delivery task. */ void setResumeTime(long resumeTime) { this.resumeTime = resumeTime; } /** * Has this delivery task been cleaned. */ boolean isCleaned() { return (hdrs == null); } /** * Get length of body. */ public long getLength() { return (length); } /** * Get creation date as encoded in the publish ID. */ long getDate() { return (date); } /** * Get the most recent delivery attempt URL. */ public String getURL() { return (url); } /** * Get the content type. */ String getCType() { return (ctype); } /** * Get the method. */ String getMethod() { return (method); } /** * Get the file ID. */ String getFileId() { return (fileid); } /** * Get the number of delivery attempts. */ int getAttempts() { return (attempts); } /** * Get the (space delimited list of) subscription ID for this delivery task. */ String getSubId() { return (subid); } /** * Get the feed ID for this delivery task. */ String getFeedId() { return (feedid); } /** * Get the followRedirects for this delivery task. */ public boolean getFollowRedirects() { return (followRedirects); } }