[DMAAP-DR] Remove cadi/aaf from dr-node
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryTask.java
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
deleted file mode 100644 (file)
index 55ad6aa..0000000
+++ /dev/null
@@ -1,472 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START==================================================
- * * org.onap.dmaap
- * * ===========================================================================
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * * ===========================================================================
- * * Licensed under the Apache License, Version 2.0 (the "License");
- * * you may not use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- *  *      http://www.apache.org/licenses/LICENSE-2.0
- * *
- *  * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- * * ============LICENSE_END====================================================
- * *
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * *
- ******************************************************************************/
-
-
-package org.onap.dmaap.datarouter.node;
-
-import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
-import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.ProtocolException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.UUID;
-import java.util.zip.GZIPInputStream;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-import org.slf4j.MDC;
-
-/**
- * A file to be delivered to a destination.
- *
- * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
- * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
- */
-public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
-
-    private static final String DECOMPRESSION_STATUS = "Decompression_Status";
-    private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
-    private DeliveryTaskHelper deliveryTaskHelper;
-    private String pubid;
-    private DestInfo destInfo;
-    private String spool;
-    private File datafile;
-    private File metafile;
-    private long length;
-    private long date;
-    private String method;
-    private String fileid;
-    private String ctype;
-    private String url;
-    private String feedid;
-    private String subid;
-    private int attempts;
-    private boolean followRedirects;
-    private String[][] hdrs;
-    private String newInvocationId;
-    private long resumeTime;
-
-
-    /**
-     * Create a delivery task for a given delivery queue and pub ID.
-     *
-     * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
-     * @param pubid The publish ID for this file.  This is used as the base for the file name in the spool directory and
-     *      is of the form (milliseconds since 1970).(fqdn of initial data router node)
-     */
-    DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
-        this.deliveryTaskHelper = deliveryTaskHelper;
-        this.pubid = pubid;
-        destInfo = deliveryTaskHelper.getDestinationInfo();
-        subid = destInfo.getSubId();
-        this.followRedirects = destInfo.isFollowRedirects();
-        feedid = destInfo.getLogData();
-        spool = destInfo.getSpool();
-        String dfn = spool + File.separator + pubid;
-        String mfn = dfn + ".M";
-        datafile = new File(spool + File.separator + pubid);
-        metafile = new File(mfn);
-        boolean monly = destInfo.isMetaDataOnly();
-        date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
-        resumeTime = System.currentTimeMillis();
-        ArrayList<String[]> hdrv = new ArrayList<>();
-
-        try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
-            String line = br.readLine();
-            int index = line.indexOf('\t');
-            method = line.substring(0, index);
-            NodeUtils.setIpAndFqdnForEelf(method);
-            if (!"DELETE".equals(method) && !monly) {
-                length = datafile.length();
-            }
-            fileid = line.substring(index + 1);
-            while ((line = br.readLine()) != null) {
-                index = line.indexOf('\t');
-                String header = line.substring(0, index);
-                String headerValue = line.substring(index + 1);
-                if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
-                    subid = headerValue.replaceAll("[^ ]*/+", "");
-                    feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
-                }
-                if (length == 0 && header.toLowerCase().startsWith("content-")) {
-                    continue;
-                }
-                if ("content-type".equalsIgnoreCase(header)) {
-                    ctype = headerValue;
-                }
-                if ("x-onap-requestid".equalsIgnoreCase(header)) {
-                    MDC.put(MDC_KEY_REQUEST_ID, headerValue);
-                }
-                if ("x-invocationid".equalsIgnoreCase(header)) {
-                    MDC.put("InvocationId", headerValue);
-                    headerValue = UUID.randomUUID().toString();
-                    newInvocationId = headerValue;
-                }
-                hdrv.add(new String[]{header, headerValue});
-            }
-        } catch (Exception e) {
-            eelfLogger.error("Exception", e);
-        }
-        hdrs = hdrv.toArray(new String[hdrv.size()][]);
-        url = deliveryTaskHelper.getDestURL(fileid);
-    }
-
-    /**
-     * Is the object a DeliveryTask with the same publication ID.
-     */
-    public boolean equals(Object object) {
-        if (!(object instanceof DeliveryTask)) {
-            return (false);
-        }
-        return (pubid.equals(((DeliveryTask) object).pubid));
-    }
-
-    /**
-     * Compare the publication IDs.
-     */
-    public int compareTo(DeliveryTask other) {
-        return (pubid.compareTo(other.pubid));
-    }
-
-    /**
-     * Get the hash code of the publication ID.
-     */
-    public int hashCode() {
-        return (pubid.hashCode());
-    }
-
-    /**
-     * Return the publication ID.
-     */
-    public String toString() {
-        return (pubid);
-    }
-
-    /**
-     * Get the publish ID.
-     */
-    String getPublishId() {
-        return (pubid);
-    }
-
-    /**
-     * Attempt delivery.
-     */
-    public void run() {
-        attempts++;
-        try {
-            destInfo = deliveryTaskHelper.getDestinationInfo();
-            boolean monly = destInfo.isMetaDataOnly();
-            length = 0;
-            if (!"DELETE".equals(method) && !monly) {
-                length = datafile.length();
-            }
-            stripSuffixIfIsDecompress();
-            url = deliveryTaskHelper.getDestURL(fileid);
-            URL urlObj = new URL(url);
-            HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
-            urlConnection.setConnectTimeout(60000);
-            urlConnection.setReadTimeout(60000);
-            urlConnection.setInstanceFollowRedirects(false);
-            urlConnection.setRequestMethod(method);
-            urlConnection.setRequestProperty("Content-Length", Long.toString(length));
-            urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
-            urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
-            boolean expect100 = destInfo.isUsing100();
-            int rc = deliverFileToSubscriber(expect100, urlConnection);
-            String rmsg = urlConnection.getResponseMessage();
-            rmsg = getResponseMessage(urlConnection, rmsg);
-            String xpubid = null;
-            InputStream is;
-            if (rc >= 200 && rc <= 299) {
-                is = urlConnection.getInputStream();
-                xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
-            } else {
-                if (rc >= 300 && rc <= 399) {
-                    rmsg = urlConnection.getHeaderField("Location");
-                }
-                is = urlConnection.getErrorStream();
-            }
-            byte[] buf = new byte[4096];
-            if (is != null) {
-                while (is.read(buf) > 0) {
-                    //flush the buffer
-                }
-                is.close();
-            }
-            deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
-        } catch (Exception e) {
-            eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
-            deliveryTaskHelper.reportException(this, e);
-        }
-    }
-
-    /**
-     * To send decompressed gzip to the subscribers.
-     *
-     * @param httpURLConnection connection used to make request
-     */
-    private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
-        byte[] buffer = new byte[8164];
-        httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
-        OutputStream outputStream = getOutputStream(httpURLConnection);
-        if (outputStream != null) {
-            int bytesRead;
-            try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
-                int bufferLength = buffer.length;
-                while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
-                    outputStream.write(buffer, 0, bytesRead);
-                }
-                outputStream.close();
-            } catch (IOException e) {
-                httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
-                eelfLogger.info("Could not decompress file", e);
-                sendFile(httpURLConnection);
-            }
-
-        }
-    }
-
-    /**
-     * To send any file to the subscriber.
-     *
-     * @param httpURLConnection connection used to make request
-     */
-    private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
-        OutputStream os = getOutputStream(httpURLConnection);
-        if (os == null) {
-            return;
-        }
-        long sofar = 0;
-        try (InputStream is = new FileInputStream(datafile)) {
-            byte[] buf = new byte[1024 * 1024];
-            while (sofar < length) {
-                int len = buf.length;
-                if (sofar + len > length) {
-                    len = (int) (length - sofar);
-                }
-                len = is.read(buf, 0, len);
-                if (len <= 0) {
-                    throw new IOException("Unexpected problem reading data file " + datafile);
-                }
-                sofar += len;
-                os.write(buf, 0, len);
-            }
-            os.close();
-        } catch (IOException ioe) {
-            deliveryTaskHelper.reportDeliveryExtra(this, sofar);
-            throw ioe;
-        }
-    }
-
-    /**
-     * Get the outputstream that will be used to send data.
-     *
-     * @param httpURLConnection connection used to make request
-     * @return AN Outpustream that can be used to send your data.
-     */
-    OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
-        OutputStream outputStream = null;
-        try {
-            outputStream = httpURLConnection.getOutputStream();
-        } catch (ProtocolException pe) {
-            deliveryTaskHelper.reportDeliveryExtra(this, -1L);
-            // Rcvd error instead of 100-continue
-            eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
-        }
-        return outputStream;
-    }
-
-    private void stripSuffixIfIsDecompress() {
-        if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
-            fileid = fileid.replace(".gz", "");
-        }
-    }
-
-    private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
-        for (String[] nv : hdrs) {
-            uc.addRequestProperty(nv[0], nv[1]);
-        }
-        if (length > 0) {
-            if (expect100) {
-                uc.setRequestProperty("Expect", "100-continue");
-            }
-            uc.setDoOutput(true);
-            if (destInfo.isDecompress()) {
-                if (isFiletypeGzip(datafile)) {
-                    sendDecompressedFile(uc);
-                } else {
-                    uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
-                    sendFile(uc);
-                }
-            } else {
-                sendFile(uc);
-            }
-        }
-        return uc.getResponseCode();
-    }
-
-    @Nullable
-    private String getResponseMessage(HttpURLConnection uc, String rmsg) {
-        if (rmsg == null) {
-            String h0 = uc.getHeaderField(0);
-            if (h0 != null) {
-                int indexOfSpace1 = h0.indexOf(' ');
-                int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
-                if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
-                    rmsg = h0.substring(indexOfSpace2 + 1);
-                }
-            }
-        }
-        return rmsg;
-    }
-
-    /**
-     * Remove meta and data files.
-     */
-    void clean() {
-        deleteWithRetry(datafile);
-        deleteWithRetry(metafile);
-        eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
-        eelfLogger.info(EelfMsgs.EXIT);
-        hdrs = null;
-    }
-
-    private void deleteWithRetry(File file) {
-        int maxTries = 3;
-        int tryCount = 1;
-        while (tryCount <= maxTries) {
-            try {
-                Files.deleteIfExists(file.toPath());
-                break;
-            } catch (IOException e) {
-                eelfLogger.error("IOException : Failed to delete file :"
-                                         + file.getName() + " on attempt " + tryCount, e);
-            }
-            tryCount++;
-        }
-    }
-
-    /**
-     * Get the resume time for a delivery task.
-     */
-    long getResumeTime() {
-        return resumeTime;
-    }
-
-    /**
-     * Set the resume time for a delivery task.
-     */
-    void setResumeTime(long resumeTime) {
-        this.resumeTime = resumeTime;
-    }
-
-    /**
-     * Has this delivery task been cleaned.
-     */
-    boolean isCleaned() {
-        return (hdrs == null);
-    }
-
-    /**
-     * Get length of body.
-     */
-    public long getLength() {
-        return (length);
-    }
-
-    /**
-     * Get creation date as encoded in the publish ID.
-     */
-    long getDate() {
-        return (date);
-    }
-
-    /**
-     * Get the most recent delivery attempt URL.
-     */
-    public String getURL() {
-        return (url);
-    }
-
-    /**
-     * Get the content type.
-     */
-    String getCType() {
-        return (ctype);
-    }
-
-    /**
-     * Get the method.
-     */
-    String getMethod() {
-        return (method);
-    }
-
-    /**
-     * Get the file ID.
-     */
-    String getFileId() {
-        return (fileid);
-    }
-
-    /**
-     * Get the number of delivery attempts.
-     */
-    int getAttempts() {
-        return (attempts);
-    }
-
-    /**
-     * Get the (space delimited list of) subscription ID for this delivery task.
-     */
-    String getSubId() {
-        return (subid);
-    }
-
-    /**
-     * Get the feed ID for this delivery task.
-     */
-    String getFeedId() {
-        return (feedid);
-    }
-
-    /**
-     * Get the followRedirects for this delivery task.
-     */
-    boolean getFollowRedirects() {
-        return (followRedirects);
-    }
-}