1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node.delivery;
27 import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
28 import static org.onap.dmaap.datarouter.node.utils.NodeUtils.isFiletypeGzip;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import java.io.BufferedReader;
34 import java.io.FileInputStream;
35 import java.io.FileReader;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.net.HttpURLConnection;
40 import java.net.ProtocolException;
42 import java.nio.file.Files;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.UUID;
46 import java.util.zip.GZIPInputStream;
47 import org.jetbrains.annotations.Nullable;
48 import org.onap.dmaap.datarouter.node.DestInfo;
49 import org.onap.dmaap.datarouter.node.utils.NodeUtils;
50 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
54 * A file to be delivered to a destination.
56 * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
57 * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
59 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
61 private static final String DECOMPRESSION_STATUS = "Decompression_Status";
62 private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
63 private DeliveryTaskHelper deliveryTaskHelper;
65 private DestInfo destInfo;
67 private File datafile;
68 private File metafile;
71 private String method;
72 private String fileid;
75 private String feedid;
78 private boolean followRedirects;
79 private String[][] hdrs;
80 private String newInvocationId;
81 private long resumeTime;
85 * Create a delivery task for a given delivery queue and pub ID.
87 * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
88 * @param pubid The publish ID for this file. This is used as the base for the file name in the spool directory and
89 * is of the form (milliseconds since 1970).(fqdn of initial data router node)
91 public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
92 this.deliveryTaskHelper = deliveryTaskHelper;
94 destInfo = deliveryTaskHelper.getDestinationInfo();
95 subid = destInfo.getSubId();
96 this.followRedirects = destInfo.isFollowRedirects();
97 feedid = destInfo.getLogData();
98 spool = destInfo.getSpool();
99 String dfn = spool + File.separator + pubid;
100 String mfn = dfn + ".M";
101 datafile = new File(spool + File.separator + pubid);
102 metafile = new File(mfn);
103 boolean monly = destInfo.isMetaDataOnly();
104 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
105 resumeTime = System.currentTimeMillis();
106 ArrayList<String[]> hdrv = new ArrayList<>();
108 try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
109 String line = br.readLine();
110 int index = line.indexOf('\t');
111 method = line.substring(0, index);
112 NodeUtils.setIpAndFqdnForEelf(method);
113 if (!"DELETE".equals(method) && !monly) {
114 length = datafile.length();
116 fileid = line.substring(index + 1);
117 while ((line = br.readLine()) != null) {
118 index = line.indexOf('\t');
119 String header = line.substring(0, index);
120 String headerValue = line.substring(index + 1);
121 if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
122 subid = headerValue.replaceAll("[^ ]*/+", "");
123 feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
125 if (length == 0 && header.toLowerCase().startsWith("content-")) {
128 if ("content-type".equalsIgnoreCase(header)) {
131 if ("x-onap-requestid".equalsIgnoreCase(header)) {
132 MDC.put(MDC_KEY_REQUEST_ID, headerValue);
134 if ("x-invocationid".equalsIgnoreCase(header)) {
135 MDC.put("InvocationId", headerValue);
136 headerValue = UUID.randomUUID().toString();
137 newInvocationId = headerValue;
139 hdrv.add(new String[]{header, headerValue});
141 } catch (Exception e) {
142 eelfLogger.error("Exception", e);
144 hdrs = hdrv.toArray(new String[hdrv.size()][]);
145 url = deliveryTaskHelper.getDestURL(fileid);
149 * Is the object a DeliveryTask with the same publication ID.
151 public boolean equals(Object object) {
152 if (!(object instanceof DeliveryTask)) {
155 return (pubid.equals(((DeliveryTask) object).pubid));
159 * Compare the publication IDs.
161 public int compareTo(DeliveryTask other) {
162 return (pubid.compareTo(other.pubid));
166 * Get the hash code of the publication ID.
168 public int hashCode() {
169 return (pubid.hashCode());
173 * Return the publication ID.
175 public String toString() {
180 * Get the publish ID.
182 public String getPublishId() {
192 destInfo = deliveryTaskHelper.getDestinationInfo();
193 boolean monly = destInfo.isMetaDataOnly();
195 if (!"DELETE".equals(method) && !monly) {
196 length = datafile.length();
198 stripSuffixIfIsDecompress();
199 url = deliveryTaskHelper.getDestURL(fileid);
200 URL urlObj = new URL(url);
201 HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
202 urlConnection.setConnectTimeout(60000);
203 urlConnection.setReadTimeout(60000);
204 urlConnection.setInstanceFollowRedirects(false);
205 urlConnection.setRequestMethod(method);
206 urlConnection.setRequestProperty("Content-Length", Long.toString(length));
207 urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
208 urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
209 boolean expect100 = destInfo.isUsing100();
210 int rc = deliverFileToSubscriber(expect100, urlConnection);
211 String rmsg = urlConnection.getResponseMessage();
212 rmsg = getResponseMessage(urlConnection, rmsg);
213 String xpubid = null;
215 if (rc >= 200 && rc <= 299) {
216 is = urlConnection.getInputStream();
217 xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
219 if (rc >= 300 && rc <= 399) {
220 rmsg = urlConnection.getHeaderField("Location");
222 is = urlConnection.getErrorStream();
224 byte[] buf = new byte[4096];
226 while (is.read(buf) > 0) {
231 deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
232 } catch (Exception e) {
233 eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
234 deliveryTaskHelper.reportException(this, e);
239 * To send decompressed gzip to the subscribers.
241 * @param httpURLConnection connection used to make request
243 private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
244 byte[] buffer = new byte[8164];
245 httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
246 OutputStream outputStream = getOutputStream(httpURLConnection);
247 if (outputStream != null) {
249 try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
250 int bufferLength = buffer.length;
251 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
252 outputStream.write(buffer, 0, bytesRead);
254 outputStream.close();
255 } catch (IOException e) {
256 httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
257 eelfLogger.info("Could not decompress file", e);
258 sendFile(httpURLConnection);
265 * To send any file to the subscriber.
267 * @param httpURLConnection connection used to make request
269 private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
270 OutputStream os = getOutputStream(httpURLConnection);
275 try (InputStream is = new FileInputStream(datafile)) {
276 byte[] buf = new byte[1024 * 1024];
277 while (sofar < length) {
278 int len = buf.length;
279 if (sofar + len > length) {
280 len = (int) (length - sofar);
282 len = is.read(buf, 0, len);
284 throw new IOException("Unexpected problem reading data file " + datafile);
287 os.write(buf, 0, len);
290 } catch (IOException ioe) {
291 deliveryTaskHelper.reportDeliveryExtra(this, sofar);
297 * Get the outputstream that will be used to send data.
299 * @param httpURLConnection connection used to make request
300 * @return AN Outpustream that can be used to send your data.
302 OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
303 OutputStream outputStream = null;
305 outputStream = httpURLConnection.getOutputStream();
306 } catch (ProtocolException pe) {
307 deliveryTaskHelper.reportDeliveryExtra(this, -1L);
308 // Rcvd error instead of 100-continue
309 eelfLogger.error("Exception " + Arrays.toString(pe.getStackTrace()), pe);
314 private void stripSuffixIfIsDecompress() {
315 if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
316 fileid = fileid.replace(".gz", "");
320 private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
321 for (String[] nv : hdrs) {
322 uc.addRequestProperty(nv[0], nv[1]);
326 uc.setRequestProperty("Expect", "100-continue");
328 uc.setDoOutput(true);
329 if (destInfo.isDecompress()) {
330 if (isFiletypeGzip(datafile)) {
331 sendDecompressedFile(uc);
333 uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
340 return uc.getResponseCode();
344 private String getResponseMessage(HttpURLConnection uc, String rmsg) {
346 String h0 = uc.getHeaderField(0);
348 int indexOfSpace1 = h0.indexOf(' ');
349 int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
350 if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
351 rmsg = h0.substring(indexOfSpace2 + 1);
359 * Remove meta and data files.
361 public void clean() {
362 deleteWithRetry(datafile);
363 deleteWithRetry(metafile);
364 eelfLogger.info(EelfMsgs.INVOKE, newInvocationId);
365 eelfLogger.info(EelfMsgs.EXIT);
369 private void deleteWithRetry(File file) {
372 while (tryCount <= maxTries) {
374 Files.deleteIfExists(file.toPath());
376 } catch (IOException e) {
377 eelfLogger.error("IOException : Failed to delete file :"
378 + file.getName() + " on attempt " + tryCount, e);
385 * Get the resume time for a delivery task.
387 long getResumeTime() {
392 * Set the resume time for a delivery task.
394 public void setResumeTime(long resumeTime) {
395 this.resumeTime = resumeTime;
399 * Has this delivery task been cleaned.
401 public boolean isCleaned() {
402 return (hdrs == null);
406 * Get length of body.
408 public long getLength() {
413 * Get creation date as encoded in the publish ID.
415 public long getDate() {
420 * Get the most recent delivery attempt URL.
422 public String getURL() {
427 * Get the content type.
429 public String getCType() {
436 public String getMethod() {
443 public String getFileId() {
448 * Get the number of delivery attempts.
450 public int getAttempts() {
455 * Get the (space delimited list of) subscription ID for this delivery task.
457 public String getSubId() {
462 * Get the feed ID for this delivery task.
464 public String getFeedId() {
469 * Get the followRedirects for this delivery task.
471 public boolean getFollowRedirects() {
472 return (followRedirects);