1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
30 import java.util.zip.GZIPInputStream;
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34 import org.apache.log4j.Logger;
35 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
38 import static com.att.eelf.configuration.Configuration.*;
39 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
42 * A file to be delivered to a destination.
44 * A Delivery task represents a work item for the data router - a file that
45 * needs to be delivered and provides mechanisms to get information about
46 * the file and its delivery data as well as to attempt delivery.
48 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
49 private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
50 private static EELFLogger eelflogger = EELFManager.getInstance()
51 .getLogger(DeliveryTask.class);
52 private DeliveryTaskHelper deliveryTaskHelper;
54 private DestInfo destInfo;
56 private File datafile;
57 private File metafile;
60 private String method;
61 private String fileid;
64 private String feedid;
67 private String[][] hdrs;
68 private String newInvocationId;
69 private long resumeTime;
73 * Create a delivery task for a given delivery queue and pub ID
75 * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
76 * @param pubid The publish ID for this file. This is used as
77 * the base for the file name in the spool directory and is of
78 * the form <milliseconds since 1970>.<fqdn of initial data router node>
80 DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
81 this.deliveryTaskHelper = deliveryTaskHelper;
83 destInfo = deliveryTaskHelper.getDestinationInfo();
84 subid = destInfo.getSubId();
85 feedid = destInfo.getLogData();
86 spool = destInfo.getSpool();
87 String dfn = spool + "/" + pubid;
88 String mfn = dfn + ".M";
89 datafile = new File(spool + "/" + pubid);
90 metafile = new File(mfn);
91 boolean monly = destInfo.isMetaDataOnly();
92 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
93 resumeTime = System.currentTimeMillis();
94 Vector<String[]> hdrv = new Vector<>();
96 try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
97 String s = br.readLine();
98 int i = s.indexOf('\t');
99 method = s.substring(0, i);
100 NodeUtils.setIpAndFqdnForEelf(method);
101 if (!"DELETE".equals(method) && !monly) {
102 length = datafile.length();
104 fileid = s.substring(i + 1);
105 while ((s = br.readLine()) != null) {
107 String h = s.substring(0, i);
108 String v = s.substring(i + 1);
109 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
110 subid = v.replaceAll("[^ ]*/", "");
111 feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
113 if (length == 0 && h.toLowerCase().startsWith("content-")) {
116 if (h.equalsIgnoreCase("content-type")) {
119 if (h.equalsIgnoreCase("x-onap-requestid")) {
120 MDC.put(MDC_KEY_REQUEST_ID, v);
122 if (h.equalsIgnoreCase("x-invocationid")) {
123 MDC.put("InvocationId", v);
124 v = UUID.randomUUID().toString();
127 hdrv.add(new String[]{h, v});
129 } catch (Exception e) {
130 loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
132 hdrs = hdrv.toArray(new String[hdrv.size()][]);
133 url = deliveryTaskHelper.getDestURL(fileid);
137 * Is the object a DeliveryTask with the same publication ID?
139 public boolean equals(Object o) {
140 if (!(o instanceof DeliveryTask)) {
143 return (pubid.equals(((DeliveryTask) o).pubid));
147 * Compare the publication IDs.
149 public int compareTo(DeliveryTask o) {
150 return (pubid.compareTo(o.pubid));
154 * Get the hash code of the publication ID.
156 public int hashCode() {
157 return (pubid.hashCode());
161 * Return the publication ID.
163 public String toString() {
170 String getPublishId() {
180 destInfo = deliveryTaskHelper.getDestinationInfo();
181 boolean expect100 = destInfo.isUsing100();
182 boolean monly = destInfo.isMetaDataOnly();
184 if (!"DELETE".equals(method) && !monly) {
185 length = datafile.length();
187 if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
188 fileid = fileid.replace(".gz", "");
190 url = deliveryTaskHelper.getDestURL(fileid);
191 URL u = new URL(url);
192 HttpURLConnection uc = (HttpURLConnection) u.openConnection();
193 uc.setConnectTimeout(60000);
194 uc.setReadTimeout(60000);
195 uc.setInstanceFollowRedirects(false);
196 uc.setRequestMethod(method);
197 uc.setRequestProperty("Content-Length", Long.toString(length));
198 uc.setRequestProperty("Authorization", destInfo.getAuth());
199 uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
200 for (String[] nv : hdrs) {
201 uc.addRequestProperty(nv[0], nv[1]);
205 uc.setRequestProperty("Expect", "100-continue");
207 uc.setDoOutput(true);
208 if (destInfo.isDecompress()) {
209 if (isFiletypeGzip(datafile)) {
210 sendDecompressedFile(uc);
212 uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
219 int rc = uc.getResponseCode();
220 String rmsg = uc.getResponseMessage();
222 String h0 = uc.getHeaderField(0);
224 int i = h0.indexOf(' ');
225 int j = h0.indexOf(' ', i + 1);
226 if (i != -1 && j != -1) {
227 rmsg = h0.substring(j + 1);
231 String xpubid = null;
233 if (rc >= 200 && rc <= 299) {
234 is = uc.getInputStream();
235 xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
237 if (rc >= 300 && rc <= 399) {
238 rmsg = uc.getHeaderField("Location");
240 is = uc.getErrorStream();
242 byte[] buf = new byte[4096];
244 while (is.read(buf) > 0) {
248 deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
249 } catch (Exception e) {
250 loggerDeliveryTask.error("Exception " + e.getStackTrace(), e);
251 deliveryTaskHelper.reportException(this, e);
256 * To send decompressed gzip to the subscribers
258 * @param httpURLConnection connection used to make request
259 * @throws IOException
261 private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
262 byte[] buffer = new byte[8164];
263 httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
264 OutputStream outputStream = getOutputStream(httpURLConnection);
265 if (outputStream != null) {
267 try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
268 int bufferLength = buffer.length;
269 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
270 outputStream.write(buffer, 0, bytesRead);
272 outputStream.close();
273 } catch (IOException e) {
274 httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
275 loggerDeliveryTask.info("Could not decompress file");
276 sendFile(httpURLConnection);
283 * To send any file to the subscriber.
285 * @param httpURLConnection connection used to make request
286 * @throws IOException
288 private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
289 OutputStream os = getOutputStream(httpURLConnection);
292 try (InputStream is = new FileInputStream(datafile)) {
293 byte[] buf = new byte[1024 * 1024];
294 while (sofar < length) {
296 if (sofar + i > length) {
297 i = (int) (length - sofar);
299 i = is.read(buf, 0, i);
301 throw new IOException("Unexpected problem reading data file " + datafile);
307 } catch (IOException ioe) {
308 deliveryTaskHelper.reportDeliveryExtra(this, sofar);
315 * Get the outputstream that will be used to send data
317 * @param httpURLConnection connection used to make request
318 * @return AN Outpustream that can be used to send your data.
319 * @throws IOException
321 private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
322 OutputStream outputStream = null;
325 outputStream = httpURLConnection.getOutputStream();
326 } catch (ProtocolException pe) {
327 deliveryTaskHelper.reportDeliveryExtra(this, -1L);
328 // Rcvd error instead of 100-continue
329 loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe);
335 * Remove meta and data files
340 eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
341 eelflogger.info(EelfMsgs.EXIT);
346 * Set the resume time for a delivery task.
348 void setResumeTime(long resumeTime) {
349 this.resumeTime = resumeTime;
353 * Get the resume time for a delivery task.
355 long getResumeTime() {
360 * Has this delivery task been cleaned?
362 boolean isCleaned() {
363 return (hdrs == null);
369 public long getLength() {
374 * Get creation date as encoded in the publish ID.
381 * Get the most recent delivery attempt URL
383 public String getURL() {
388 * Get the content type
409 * Get the number of delivery attempts
416 * Get the (space delimited list of) subscription ID for this delivery task
423 * Get the feed ID for this delivery task