1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
30 import java.util.zip.GZIPInputStream;
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34 import org.apache.log4j.Logger;
35 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
38 import static com.att.eelf.configuration.Configuration.*;
39 import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
42 * A file to be delivered to a destination.
44 * A Delivery task represents a work item for the data router - a file that
45 * needs to be delivered and provides mechanisms to get information about
46 * the file and its delivery data as well as to attempt delivery.
48 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
49 private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
50 private static EELFLogger eelflogger = EELFManager.getInstance()
51 .getLogger(DeliveryTask.class);
52 private DeliveryTaskHelper deliveryTaskHelper;
54 private DestInfo destInfo;
56 private File datafile;
57 private File metafile;
60 private String method;
61 private String fileid;
64 private String feedid;
67 private String[][] hdrs;
68 private String newInvocationId;
72 * Create a delivery task for a given delivery queue and pub ID
74 * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
75 * @param pubid The publish ID for this file. This is used as
76 * the base for the file name in the spool directory and is of
77 * the form <milliseconds since 1970>.<fqdn of initial data router node>
79 public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
80 this.deliveryTaskHelper = deliveryTaskHelper;
82 destInfo = deliveryTaskHelper.getDestinationInfo();
83 subid = destInfo.getSubId();
84 feedid = destInfo.getLogData();
85 spool = destInfo.getSpool();
86 String dfn = spool + "/" + pubid;
87 String mfn = dfn + ".M";
88 datafile = new File(spool + "/" + pubid);
89 metafile = new File(mfn);
90 boolean monly = destInfo.isMetaDataOnly();
91 date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
92 Vector<String[]> hdrv = new Vector<>();
94 try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
95 String s = br.readLine();
96 int i = s.indexOf('\t');
97 method = s.substring(0, i);
98 NodeUtils.setIpAndFqdnForEelf(method);
99 if (!"DELETE".equals(method) && !monly) {
100 length = datafile.length();
102 fileid = s.substring(i + 1);
103 while ((s = br.readLine()) != null) {
105 String h = s.substring(0, i);
106 String v = s.substring(i + 1);
107 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
108 subid = v.replaceAll("[^ ]*/", "");
109 feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
111 if (length == 0 && h.toLowerCase().startsWith("content-")) {
114 if (h.equalsIgnoreCase("content-type")) {
117 if (h.equalsIgnoreCase("x-onap-requestid")) {
118 MDC.put(MDC_KEY_REQUEST_ID, v);
120 if (h.equalsIgnoreCase("x-invocationid")) {
121 MDC.put("InvocationId", v);
122 v = UUID.randomUUID().toString();
125 hdrv.add(new String[]{h, v});
127 } catch (Exception e) {
128 loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
130 hdrs = hdrv.toArray(new String[hdrv.size()][]);
131 url = deliveryTaskHelper.getDestURL(fileid);
135 * Is the object a DeliveryTask with the same publication ID?
137 public boolean equals(Object o) {
138 if (!(o instanceof DeliveryTask)) {
141 return (pubid.equals(((DeliveryTask) o).pubid));
145 * Compare the publication IDs.
147 public int compareTo(DeliveryTask o) {
148 return (pubid.compareTo(o.pubid));
152 * Get the hash code of the publication ID.
154 public int hashCode() {
155 return (pubid.hashCode());
159 * Return the publication ID.
161 public String toString() {
168 public String getPublishId() {
178 destInfo = deliveryTaskHelper.getDestinationInfo();
179 boolean expect100 = destInfo.isUsing100();
180 boolean monly = destInfo.isMetaDataOnly();
182 if (!"DELETE".equals(method) && !monly) {
183 length = datafile.length();
185 if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
186 fileid = fileid.replace(".gz", "");
188 url = deliveryTaskHelper.getDestURL(fileid);
189 URL u = new URL(url);
190 HttpURLConnection uc = (HttpURLConnection) u.openConnection();
191 uc.setConnectTimeout(60000);
192 uc.setReadTimeout(60000);
193 uc.setInstanceFollowRedirects(false);
194 uc.setRequestMethod(method);
195 uc.setRequestProperty("Content-Length", Long.toString(length));
196 uc.setRequestProperty("Authorization", destInfo.getAuth());
197 uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
198 for (String[] nv : hdrs) {
199 uc.addRequestProperty(nv[0], nv[1]);
203 uc.setRequestProperty("Expect", "100-continue");
205 uc.setDoOutput(true);
206 if (destInfo.isDecompress()) {
207 if (isFiletypeGzip(datafile)) {
208 sendDecompressedFile(uc);
210 uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
217 int rc = uc.getResponseCode();
218 String rmsg = uc.getResponseMessage();
220 String h0 = uc.getHeaderField(0);
222 int i = h0.indexOf(' ');
223 int j = h0.indexOf(' ', i + 1);
224 if (i != -1 && j != -1) {
225 rmsg = h0.substring(j + 1);
229 String xpubid = null;
231 if (rc >= 200 && rc <= 299) {
232 is = uc.getInputStream();
233 xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
235 if (rc >= 300 && rc <= 399) {
236 rmsg = uc.getHeaderField("Location");
238 is = uc.getErrorStream();
240 byte[] buf = new byte[4096];
242 while (is.read(buf) > 0) {
246 deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
247 } catch (Exception e) {
248 loggerDeliveryTask.error("Exception " + e.getStackTrace(), e);
249 deliveryTaskHelper.reportException(this, e);
254 * To send decompressed gzip to the subscribers
256 * @param httpURLConnection connection used to make request
257 * @throws IOException
259 private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
260 byte[] buffer = new byte[8164];
261 httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
262 OutputStream outputStream = getOutputStream(httpURLConnection);
263 if (outputStream != null) {
265 try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
266 int bufferLength = buffer.length;
267 while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
268 outputStream.write(buffer, 0, bytesRead);
270 outputStream.close();
271 } catch (IOException e) {
272 httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
273 loggerDeliveryTask.info("Could not decompress file");
274 sendFile(httpURLConnection);
281 * To send any file to the subscriber.
283 * @param httpURLConnection connection used to make request
284 * @throws IOException
286 private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
287 OutputStream os = getOutputStream(httpURLConnection);
290 try (InputStream is = new FileInputStream(datafile)) {
291 byte[] buf = new byte[1024 * 1024];
292 while (sofar < length) {
294 if (sofar + i > length) {
295 i = (int) (length - sofar);
297 i = is.read(buf, 0, i);
299 throw new IOException("Unexpected problem reading data file " + datafile);
305 } catch (IOException ioe) {
306 deliveryTaskHelper.reportDeliveryExtra(this, sofar);
313 * Get the outputstream that will be used to send data
315 * @param httpURLConnection connection used to make request
316 * @return AN Outpustream that can be used to send your data.
317 * @throws IOException
319 private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
320 OutputStream outputStream = null;
323 outputStream = httpURLConnection.getOutputStream();
324 } catch (ProtocolException pe) {
325 deliveryTaskHelper.reportDeliveryExtra(this, -1L);
326 // Rcvd error instead of 100-continue
327 loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe);
333 * Remove meta and data files
335 public void clean() {
338 eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
339 eelflogger.info(EelfMsgs.EXIT);
344 * Has this delivery task been cleaned?
346 public boolean isCleaned() {
347 return (hdrs == null);
353 public long getLength() {
358 * Get creation date as encoded in the publish ID.
360 public long getDate() {
365 * Get the most recent delivery attempt URL
367 public String getURL() {
372 * Get the content type
374 public String getCType() {
381 public String getMethod() {
388 public String getFileId() {
393 * Get the number of delivery attempts
395 public int getAttempts() {
400 * Get the (space delimited list of) subscription ID for this delivery task
402 public String getSubId() {
407 * Get the feed ID for this delivery task
409 public String getFeedId() {