LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
SUSPENDED BOOLEAN DEFAULT FALSE,
PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE,
+ DECOMPRESS BOOLEAN DEFAULT FALSE,
CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
import java.io.*;
import java.net.*;
import java.util.*;
+import java.util.zip.GZIPInputStream;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import org.slf4j.MDC;
import static com.att.eelf.configuration.Configuration.*;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
/**
* A file to be delivered to a destination.
/**
* Create a delivery task for a given delivery queue and pub ID
*
- * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
- * @param pubid The publish ID for this file. This is used as
- * the base for the file name in the spool directory and is of
- * the form <milliseconds since 1970>.<fqdn of initial data router node>
+ * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
+ * @param pubid The publish ID for this file. This is used as
+ * the base for the file name in the spool directory and is of
+ * the form <milliseconds since 1970>.<fqdn of initial data router node>
*/
public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
this.deliveryTaskHelper = deliveryTaskHelper;
hdrs = hdrv.toArray(new String[hdrv.size()][]);
url = deliveryTaskHelper.getDestURL(fileid);
}
+
/**
* Is the object a DeliveryTask with the same publication ID?
*/
public String toString() {
return (pubid);
}
+
/**
* Get the publish ID
*/
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
+ if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
+ fileid = fileid.replace(".gz", "");
+ }
url = deliveryTaskHelper.getDestURL(fileid);
URL u = new URL(url);
HttpURLConnection uc = (HttpURLConnection) u.openConnection();
if (expect100) {
uc.setRequestProperty("Expect", "100-continue");
}
- uc.setFixedLengthStreamingMode(length);
uc.setDoOutput(true);
- OutputStream os = null;
- try {
- os = uc.getOutputStream();
- } catch (ProtocolException pe) {
- deliveryTaskHelper.reportDeliveryExtra(this, -1L);
- // Rcvd error instead of 100-continue
- loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
- }
- if (os != null) {
- long sofar = 0;
- try (InputStream is = new FileInputStream(datafile)) {
- byte[] buf = new byte[1024 * 1024];
- while (sofar < length) {
- int i = buf.length;
- if (sofar + i > length) {
- i = (int) (length - sofar);
- }
- i = is.read(buf, 0, i);
- if (i <= 0) {
- throw new IOException("Unexpected problem reading data file " + datafile);
- }
- sofar += i;
- os.write(buf, 0, i);
- }
- os.close();
- } catch (IOException ioe) {
- deliveryTaskHelper.reportDeliveryExtra(this, sofar);
- throw ioe;
+ if (destInfo.isDecompress()) {
+ if (isFiletypeGzip(datafile)) {
+ sendDecompressedFile(uc);
+ } else {
+ uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
+ sendFile(uc);
}
+ } else {
+ sendFile(uc);
}
}
int rc = uc.getResponseCode();
}
deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
- loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
+ loggerDeliveryTask.error("Exception " + e.getStackTrace(), e);
deliveryTaskHelper.reportException(this, e);
}
}
+ /**
+ * To send decompressed gzip to the subscribers
+ *
+ * @param httpURLConnection connection used to make request
+ * @throws IOException
+ */
+ private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
+ byte[] buffer = new byte[8164];
+ httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+ OutputStream outputStream = getOutputStream(httpURLConnection);
+ if (outputStream != null) {
+ int bytesRead = 0;
+ try (InputStream gzipInputStream = new GZIPInputStream(new FileInputStream(datafile))) {
+ int bufferLength = buffer.length;
+ while ((bytesRead = gzipInputStream.read(buffer, 0, bufferLength)) > 0) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ outputStream.close();
+ } catch (IOException e) {
+ httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+ loggerDeliveryTask.info("Could not decompress file");
+ sendFile(httpURLConnection);
+ }
+
+ }
+ }
+
+ /**
+ * To send any file to the subscriber.
+ *
+ * @param httpURLConnection connection used to make request
+ * @throws IOException
+ */
+ private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream os = getOutputStream(httpURLConnection);
+ if (os != null) {
+ long sofar = 0;
+ try (InputStream is = new FileInputStream(datafile)) {
+ byte[] buf = new byte[1024 * 1024];
+ while (sofar < length) {
+ int i = buf.length;
+ if (sofar + i > length) {
+ i = (int) (length - sofar);
+ }
+ i = is.read(buf, 0, i);
+ if (i <= 0) {
+ throw new IOException("Unexpected problem reading data file " + datafile);
+ }
+ sofar += i;
+ os.write(buf, 0, i);
+ }
+ os.close();
+ } catch (IOException ioe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Get the outputstream that will be used to send data
+ *
+ * @param httpURLConnection connection used to make request
+ * @return AN Outpustream that can be used to send your data.
+ * @throws IOException
+ */
+ private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
+ OutputStream outputStream = null;
+
+ try {
+ outputStream = httpURLConnection.getOutputStream();
+ } catch (ProtocolException pe) {
+ deliveryTaskHelper.reportDeliveryExtra(this, -1L);
+ // Rcvd error instead of 100-continue
+ loggerDeliveryTask.error("Exception " + pe.getStackTrace(), pe);
+ }
+ return outputStream;
+ }
+
/**
* Remove meta and data files
*/
private boolean metaonly;
private boolean use100;
private boolean privilegedSubscriber;
+ private boolean decompress;
/**
* Create a destination information object.
* @param metaonly Is this a metadata only delivery?
* @param use100 Should I use expect 100-continue?
* @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file
+ * @param decompress To see if the they want there information compressed or decompressed
*/
- public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) {
+ public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean decompress) {
this.name = name;
this.spool = spool;
this.subid = subid;
this.metaonly = metaonly;
this.use100 = use100;
this.privilegedSubscriber = privilegedSubscriber;
+ this.decompress = decompress;
}
/**
this.metaonly = subscription.isMetaDataOnly();
this.use100 = subscription.isUsing100();
this.privilegedSubscriber = subscription.isPrivilegedSubscriber();
+ this.decompress = subscription.isDecompress();
}
public boolean equals(Object o) {
public boolean isPrivilegedSubscriber() {
return (privilegedSubscriber);
}
+
+ /**
+ * Should i decompress the file before sending it on
+ *
+ * @return True if I should.
+ */
+ public boolean isDecompress() {
+ return (decompress);
+ }
+
+
}
public Uploader() {
dq = new DeliveryQueue(this,
new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false,
- false, false));
+ false, false, false));
setDaemon(true);
setName("Log Uploader");
start();
private boolean metaonly;
private boolean use100;
private boolean privilegedSubscriber;
+ private boolean decompress;
/**
* Construct a subscription configuration entry
* @param metaonly Is this a meta data only subscription?
* @param use100 Should we send Expect: 100-continue?
* @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
+ * @param decompress To see if they want their information compressed or decompressed
*/
public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
- boolean metaonly, boolean use100, boolean privilegedSubscriber) {
+ boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean decompress) {
this.subid = subid;
this.feedid = feedid;
this.url = url;
this.metaonly = metaonly;
this.use100 = use100;
this.privilegedSubscriber = privilegedSubscriber;
+ this.decompress = decompress;
}
/**
public boolean isPrivilegedSubscriber() {
return (privilegedSubscriber);
}
+
+ /**
+ * Should i decompress the file before sending it on
+ */
+ public boolean isDecompress() {
+ return (decompress);
+ }
}
/**
}
String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);
DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn,
- "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false);
+ "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false, false);
(new File(di.getSpool())).mkdirs();
destInfos.add(di);
nodeinfo.put(cn, di);
package org.onap.dmaap.datarouter.node;
-import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import java.io.File;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
+import static org.onap.dmaap.datarouter.node.NodeUtils.*;
+
/**
* Servlet for handling all http and https requests to the data router node
* <p>
}
mw.close();
meta.renameTo(new File(dbase + ".M"));
+
}
resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
resp.getOutputStream().close();
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Enumeration;
import java.util.TimeZone;
import java.util.UUID;
+import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.codec.binary.Base64;
/**
* Base64 encode a byte array
*
- * @param raw The bytes to be encoded
+ * @param raw The bytes to be encoded
* @return The encoded string
*/
public static String base64Encode(byte[] raw) {
}
}
+ /**
+ * Method to check to see if file is of type gzip
+ *
+ * @param file The name of the file to be checked
+ * @return True if the file is of type gzip
+ */
+ public static boolean isFiletypeGzip(File file){
+ try(FileInputStream fileInputStream = new FileInputStream(file);
+ GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
+
+ return true;
+ }catch (IOException e){
+ nodeUtilsLogger.error("NODE0403 " + file.toString() + " Not in gzip(gz) format: " + e.toString() + e);
+ return false;
+ }
+ }
+
}
boolean monly = jsub.getBoolean("metadataOnly");
boolean use100 = jdel.getBoolean("use100");
boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
- psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber));
+ boolean decompress = jsub.getBoolean("decompress");
+ psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber, decompress));
}
}
JSONObject jparams = jcfg.optJSONObject("parameters");
private DestInfo[] createDestInfoObjects() {
DestInfo[] destInfos = new DestInfo[1];
- DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true, false);
+ DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true, false, false);
destInfos[0] = destInfo;
return destInfos;
}
delivery.put("use100", true);
subscription.put("delivery", delivery);
subscription.put("privilegedSubscriber", false);
+ subscription.put("decompress", false);
subscriptions.put(subscription);
provData.put("subscriptions", subscriptions);
}
private Date lastMod;\r
private Date createdDate;\r
private boolean privilegedSubscriber;\r
+ private boolean decompress;\r
\r
public static Subscription getSubscriptionMatching(Subscription sub) {\r
SubDelivery deli = sub.getDelivery();\r
this.lastMod = new Date();\r
this.createdDate = new Date();\r
this.privilegedSubscriber = false;\r
+ this.decompress = false;\r
}\r
\r
public Subscription(ResultSet rs) throws SQLException {\r
this.lastMod = rs.getDate("LAST_MOD");\r
this.createdDate = rs.getDate("CREATED_DATE");\r
this.privilegedSubscriber = rs.getBoolean("PRIVILEGED_SUBSCRIBER");\r
+ this.decompress = rs.getBoolean("DECOMPRESS");\r
}\r
\r
public Subscription(JSONObject jo) throws InvalidObjectException {\r
this.metadataOnly = jo.getBoolean("metadataOnly");\r
this.suspended = jo.optBoolean("suspend", false);\r
this.privilegedSubscriber = jo.optBoolean("privilegedSubscriber", false);\r
+ this.decompress = jo.optBoolean("decompress", false);\r
this.subscriber = jo.optString("subscriber", "");\r
JSONObject jol = jo.optJSONObject("links");\r
this.links = (jol == null) ? (new SubLinks()) : (new SubLinks(jol));\r
this.links = links;\r
}\r
\r
+ public boolean isDecompress() {\r
+ return decompress;\r
+ }\r
+\r
+ public void setDecompress(boolean decompress) {\r
+ this.decompress = decompress;\r
+ }\r
+\r
@Override\r
public JSONObject asJSONObject() {\r
JSONObject jo = new JSONObject();\r
jo.put(LAST_MOD_KEY, lastMod.getTime());\r
jo.put(CREATED_DATE, createdDate.getTime());\r
jo.put("privilegedSubscriber", privilegedSubscriber);\r
+ jo.put("decompress", decompress);\r
return jo;\r
}\r
\r
}\r
\r
// Create the SUBSCRIPTIONS row\r
- String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
+ String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER, DECOMPRESS) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
ps = c.prepareStatement(sql, new String[]{SUBID_COL});\r
ps.setInt(1, subid);\r
ps.setInt(2, feedid);\r
ps.setBoolean(9, isSuspended());\r
ps.setInt(10, groupid); //New field is added - Groups feature Rally:US708115 - 1610\r
ps.setBoolean(11, isPrivilegedSubscriber());\r
+ ps.setBoolean(12, isDecompress());\r
ps.execute();\r
ps.close();\r
// Update the row to set the URLs\r
boolean rv = true;\r
PreparedStatement ps = null;\r
try {\r
- String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ?, PRIVILEGED_SUBSCRIBER = ? where SUBID = ?";\r
+ String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ?, PRIVILEGED_SUBSCRIBER = ?, DECOMPRESS = ? where SUBID = ?";\r
ps = c.prepareStatement(sql);\r
ps.setString(1, delivery.getUrl());\r
ps.setString(2, delivery.getUser());\r
ps.setInt(6, suspended ? 1 : 0);\r
ps.setInt(7, groupid); //New field is added - Groups feature Rally:US708115 - 1610\r
ps.setInt(8, privilegedSubscriber ? 1 : 0);\r
- ps.setInt(9, subid);\r
+ ps.setInt(9, decompress ? 1 : 0);\r
+ ps.setInt(10, subid);\r
ps.executeUpdate();\r
} catch (SQLException e) {\r
rv = false;\r
LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
SUSPENDED BOOLEAN DEFAULT FALSE,
PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE,
+ DECOMPRESS BOOLEAN DEFAULT FALSE,
CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
jo.put("metadataOnly", true);
jo.put("suspend", true);
jo.put("privilegedSubscriber", true);
+ jo.put("decompress", true);
jo.put("delivery", JSObject);
jo.put("subscriber", "differentSubscriber");
jo.put("sync", true);
jo.put("suspend", true);
jo.put("privilegedSubscriber", true);
jo.put("delivery", JSObject);
+ jo.put("decompress", true);
jo.put("sync", true);
return jo;
}
jo.put("metadataOnly", true);
jo.put("suspend", true);
jo.put("privilegedSubscriber", true);
+ jo.put("decompress", true);
jo.put("delivery", JSObject);
jo.put("sync", true);
jo.put("changeowner", true);
jo.put("metadataOnly", true);
jo.put("suspend", true);
jo.put("delivery", JSObject);
+ jo.put("privilegedSubscriber", false);
+ jo.put("decompress", false);
jo.put("failed", false);
return jo;
}
subscription.setMetadataOnly(false);
subscription.setSuspended(false);
subscription.setPrivilegedSubscriber(false);
+ subscription.setDecompress(false);
subscription.doInsert(db.getConnection());
}
subscription.setMetadataOnly(false);
subscription.setSuspended(false);
subscription.setPrivilegedSubscriber(false);
+ subscription.setDecompress(false);
subscription.changeOwnerShip();
subscription.doUpdate(db.getConnection());
}
subscription.setSuspended(false);
subscription.setPrivilegedSubscriber(false);
subscription.setLinks(subLinks);
+ subscription.setDecompress(false);
Assert.assertEquals(2, subscription.getGroupid());
Assert.assertEquals(subDelivery, subscription.getDelivery());
Assert.assertFalse(subscription.isMetadataOnly());
Assert.assertFalse(subscription.isSuspended());
Assert.assertFalse(subscription.isPrivilegedSubscriber());
+ Assert.assertFalse(subscription.isDecompress());
}
}
\ No newline at end of file
LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
SUSPENDED BOOLEAN DEFAULT FALSE,
PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE,
+ DECOMPRESS BOOLEAN DEFAULT FALSE,
CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO GROUPS(GROUPID, AUTHID, NAME, DESCRIPTION, CLASSIFICATION, MEMBERS)
VALUES (1, 'Basic dXNlcjE6cGFzc3dvcmQx', 'Group1', 'First Group for testing', 'Class1', 'Member1');
-INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER)
-VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1, false);
+INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER, DECOMPRESS)
+VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1, false, false);
INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, SUBSCRIBER, SELF_LINK, LOG_LINK)
VALUES (23, 1, 'http://delivery_url', 'user1', 'somepassword', 'sub123', 'selflink', 'loglink');