-/*******************************************************************************
- * ============LICENSE_START==================================================
- * * org.onap.dmaap
- * * ===========================================================================
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * * ===========================================================================
- * * Licensed under the Apache License, Version 2.0 (the "License");
- * * you may not use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- * * ============LICENSE_END====================================================
- * *
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * *
- ******************************************************************************/
-
-
-package org.onap.dmaap.datarouter.node;
-
-import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
-import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.ProtocolException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.UUID;
-import java.util.zip.GZIPInputStream;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-import org.slf4j.MDC;
-
-/**
- * A file to be delivered to a destination.
- *
- * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
- * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
- */
-public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
-
- private static final String DECOMPRESSION_STATUS = "Decompression_Status";
- private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
- private DeliveryTaskHelper deliveryTaskHelper;
- private String pubid;
- private DestInfo destInfo;
- private String spool;
- private File datafile;
- private File metafile;
- private long length;
- private long date;
- private String method;
- private String fileid;
- private String ctype;
- private String url;
- private String feedid;
- private String subid;
- private int attempts;
- private boolean followRedirects;
- private String[][] hdrs;
- private String newInvocationId;
- private long resumeTime;
-
-
- /**
- * Create a delivery task for a given delivery queue and pub ID.
- *
- * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
- * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and
- * is of the form (milliseconds since 1970).(fqdn of initial data router node)
- */
- DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
- this.deliveryTaskHelper = deliveryTaskHelper;
- this.pubid = pubid;
- destInfo = deliveryTaskHelper.getDestinationInfo();
- subid = destInfo.getSubId();
- this.followRedirects = destInfo.isFollowRedirects();
- feedid = destInfo.getLogData();
- spool = destInfo.getSpool();
- String dfn = spool + File.separator + pubid;
- String mfn = dfn + ".M";
- datafile = new File(spool + File.separator + pubid);
- metafile = new File(mfn);
- boolean monly = destInfo.isMetaDataOnly();
- date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
- resumeTime = System.currentTimeMillis();
- ArrayList<String[]> hdrv = new ArrayList<>();
-
- try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
- String line = br.readLine();
- int index = line.indexOf('\t');
- method = line.substring(0, index);
- NodeUtils.setIpAndFqdnForEelf(method);
- if (!"DELETE".equals(method) && !monly) {
- length = datafile.length();
- }
- fileid = line.substring(index + 1);
- while ((line = br.readLine()) != null) {
- index = line.indexOf('\t');
- String header = line.substring(0, index);
- String headerValue = line.substring(index + 1);
- if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
- subid = headerValue.replaceAll("[^ ]*/+", "");
- feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
- }
- if (length == 0 && header.toLowerCase().startsWith("content-")) {
- continue;
- }
- if ("content-type".equalsIgnoreCase(header)) {
- ctype = headerValue;
- }
- if ("x-onap-requestid".equalsIgnoreCase(header)) {
- MDC.put(MDC_KEY_REQUEST_ID, headerValue);
- }
- if ("x-invocationid".equalsIgnoreCase(header)) {
- MDC.put("InvocationId", headerValue);
- headerValue = UUID.randomUUID().toString();
- newInvocationId = headerValue;
- }
- hdrv.add(new String[]{header, headerValue});
- }
- } catch (Exception e) {
- eelfLogger.error("Exception", e);
- }
- hdrs = hdrv.toArray(new String[hdrv.size()][]);
- url = deliveryTaskHelper.getDestURL(fileid);
- }
-
- /**
- * Is the object a DeliveryTask with the same publication ID.
- */
- public boolean equals(Object object) {
- if (!(object instanceof DeliveryTask)) {
- return (false);
- }
- return (pubid.equals(((DeliveryTask) object).pubid));
- }
-
- /**
- * Compare the publication IDs.
- */
- public int compareTo(DeliveryTask other) {
- return (pubid.compareTo(other.pubid));
- }
-
- /**
- * Get the hash code of the publication ID.
- */
- public int hashCode() {
- return (pubid.hashCode());
- }
-
- /**
- * Return the publication ID.
- */
- public String toString() {
- return (pubid);
- }
-
- /**
- * Get the publish ID.
- */
- String getPublishId() {
- return (pubid);
- }
-
- /**
- * Attempt delivery.
- */
- public void run() {
- attempts++;
- try {
- destInfo = deliveryTaskHelper.getDestinationInfo();
- boolean monly = destInfo.isMetaDataOnly();
- length = 0;
- if (!"DELETE".equals(method) && !monly) {
- length = datafile.length();
- }
- stripSuffixIfIsDecompress();
- url = deliveryTaskHelper.getDestURL(fileid);
- URL urlObj = new URL(url);
- HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
- urlConnection.setConnectTimeout(60000);
- urlConnection.setReadTimeout(60000);
- urlConnection.setInstanceFollowRedirects(false);
- urlConnection.setRequestMethod(method);
- urlConnection.setRequestProperty("Content-Length", Long.toString(length));
- urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
- urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
- boolean expect100 = destInfo.isUsing100();
- int rc = deliverFileToSubscriber(expect100, urlConnection);
- String rmsg = urlConnection.getResponseMessage();
- rmsg = getResponseMessage(urlConnection, rmsg);
- String xpubid = null;
- InputStream is;
- if (rc >= 200 && rc <= 299) {
- is = urlConnection.getInputStream();
- xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
- } else {
- if (rc >= 300 && rc <= 399) {
- rmsg = urlConnection.getHeaderField("Location");
- }
- is = urlConnection.getErrorStream();
- }
- byte[] buf = new byte[4096];
- if (is != null) {
- while (is.read(buf) > 0) {
- //flush the buffer
- }
- is.close();
- }
- deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
- } catch (Exception e) {
- eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
- deliveryTaskHelper.reportException(this, e);
- }
- }
-
- /**
- * To send decompressed gzip to the subscribers.
- *
- * @param httpURLConnection connection used to make request
- */
- private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
- byte[] buffer = new byte[8164];
- httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
- OutputStream outputStream = getOutputStream(httpURLConnection);
- if (outputStream != null) {
- int bytesRead;
- try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
- int bufferLength = buffer.length;
- while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
- outputStream.write(buffer, 0, bytesRead);
- }
- outputStream.close();
- } catch (IOException e) {
- httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
- eelfLogger.info("Could not decompress file", e);
- sendFile(httpURLConnection);
- }
-
- }
- }
-
- /**
- * To send any file to the subscriber.
- *
- * @param httpURLConnection connection used to make request
- */
- private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
- OutputStream os = getOutputStream(httpURLConnection);
- if (os == null) {
- return;
- }
- long sofar = 0;
- try (InputStream is = new FileInputStream(datafile)) {
- byte[] buf = new byte[1024 * 1024];
- while (sofar < length) {
- int len = buf.length;
- if (sofar + len > length) {
- len = (int) (length - sofar);
- }
- len = is.read(buf, 0, len);
- if (len <= 0) {
- throw new IOException("Unexpected problem reading data file " + datafile);
- }
- sofar += len;
- os.write(buf, 0, len);
- }
- os.close();
- } catch (IOException ioe) {
- deliveryTaskHelper.reportDeliveryExtra(this, sofar);
- throw ioe;
- }
- }
-
- /**
- * Get the outputstream that will be used to send data.
- *
- * @param httpURLConnection connection used to make request
- * @return AN Outpustream that can be used to send your data.
- */
- OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
- OutputStream outputStream = null;
- try {
- outputStream = httpURLConnection.getOutputStream();
- } catch (ProtocolException pe) {
- deliveryTaskHelper.reportDeliveryExtra(this, -1L);
- // Rcvd error instead of 100-continue
- eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
- }
- return outputStream;
- }
-
- private void stripSuffixIfIsDecompress() {
- if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
- fileid = fileid.replace(".gz", "");
- }
- }
-
- private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
- for (String[] nv : hdrs) {
- uc.addRequestProperty(nv[0], nv[1]);
- }
- if (length > 0) {
- if (expect100) {
- uc.setRequestProperty("Expect", "100-continue");
- }
- uc.setDoOutput(true);
- if (destInfo.isDecompress()) {
- if (isFiletypeGzip(datafile)) {
- sendDecompressedFile(uc);
- } else {
- uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
- sendFile(uc);
- }
- } else {
- sendFile(uc);
- }
- }
- return uc.getResponseCode();
- }
-
- @Nullable
- private String getResponseMessage(HttpURLConnection uc, String rmsg) {
- if (rmsg == null) {
- String h0 = uc.getHeaderField(0);
- if (h0 != null) {
- int indexOfSpace1 = h0.indexOf(' ');
- int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
- if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
- rmsg = h0.substring(indexOfSpace2 + 1);
- }
- }
- }
- return rmsg;
- }
-
- /**
- * Remove meta and data files.
- */
- void clean() {
- deleteWithRetry(datafile);
- deleteWithRetry(metafile);
- eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
- eelfLogger.info(EelfMsgs.EXIT);
- hdrs = null;
- }
-
- private void deleteWithRetry(File file) {
- int maxTries = 3;
- int tryCount = 1;
- while (tryCount <= maxTries) {
- try {
- Files.deleteIfExists(file.toPath());
- break;
- } catch (IOException e) {
- eelfLogger.error("IOException : Failed to delete file :"
- + file.getName() + " on attempt " + tryCount, e);
- }
- tryCount++;
- }
- }
-
- /**
- * Get the resume time for a delivery task.
- */
- long getResumeTime() {
- return resumeTime;
- }
-
- /**
- * Set the resume time for a delivery task.
- */
- void setResumeTime(long resumeTime) {
- this.resumeTime = resumeTime;
- }
-
- /**
- * Has this delivery task been cleaned.
- */
- boolean isCleaned() {
- return (hdrs == null);
- }
-
- /**
- * Get length of body.
- */
- public long getLength() {
- return (length);
- }
-
- /**
- * Get creation date as encoded in the publish ID.
- */
- long getDate() {
- return (date);
- }
-
- /**
- * Get the most recent delivery attempt URL.
- */
- public String getURL() {
- return (url);
- }
-
- /**
- * Get the content type.
- */
- String getCType() {
- return (ctype);
- }
-
- /**
- * Get the method.
- */
- String getMethod() {
- return (method);
- }
-
- /**
- * Get the file ID.
- */
- String getFileId() {
- return (fileid);
- }
-
- /**
- * Get the number of delivery attempts.
- */
- int getAttempts() {
- return (attempts);
- }
-
- /**
- * Get the (space delimited list of) subscription ID for this delivery task.
- */
- String getSubId() {
- return (subid);
- }
-
- /**
- * Get the feed ID for this delivery task.
- */
- String getFeedId() {
- return (feedid);
- }
-
- /**
- * Get the followRedirects for this delivery task.
- */
- boolean getFollowRedirects() {
- return (followRedirects);
- }
-}