);
CREATE TABLE SUBSCRIPTIONS (
- SUBID INT UNSIGNED NOT NULL PRIMARY KEY,
- FEEDID INT UNSIGNED NOT NULL,
- GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0,
- DELIVERY_URL VARCHAR(256),
- DELIVERY_USER VARCHAR(20),
- DELIVERY_PASSWORD VARCHAR(32),
- DELIVERY_USE100 BOOLEAN DEFAULT FALSE,
- METADATA_ONLY BOOLEAN DEFAULT FALSE,
- SUBSCRIBER VARCHAR(8) NOT NULL,
- SELF_LINK VARCHAR(256),
- LOG_LINK VARCHAR(256),
- LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- SUSPENDED BOOLEAN DEFAULT FALSE,
- CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ SUBID INT UNSIGNED NOT NULL PRIMARY KEY,
+ FEEDID INT UNSIGNED NOT NULL,
+ GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0,
+ DELIVERY_URL VARCHAR(256),
+ DELIVERY_USER VARCHAR(20),
+ DELIVERY_PASSWORD VARCHAR(32),
+ DELIVERY_USE100 BOOLEAN DEFAULT FALSE,
+ METADATA_ONLY BOOLEAN DEFAULT FALSE,
+ SUBSCRIBER VARCHAR(8) NOT NULL,
+ SELF_LINK VARCHAR(256),
+ LOG_LINK VARCHAR(256),
+ LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ SUSPENDED BOOLEAN DEFAULT FALSE,
+ PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE,
+ CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
('DELIVERY_INIT_RETRY_INTERVAL', '10'),
('DELIVERY_MAX_AGE', '86400'),
('DELIVERY_MAX_RETRY_INTERVAL', '3600'),
+ ('DELIVERY_FILE_PROCESS_INTERVAL', '600'),
('DELIVERY_RETRY_RATIO', '2'),
('LOGROLL_INTERVAL', '30'),
('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'),
('PROV_MAXSUB_COUNT', '100000'),
('PROV_REQUIRE_CERT', 'false'),
('PROV_REQUIRE_SECURE', 'false'),
- ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE')
+ ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL')
;
}
}
}
+
+ /**
+ * Mark the task in spool a success
+ */
+ public synchronized boolean markTaskSuccess(String spool, String pubId) {
+ boolean succeeded = false;
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ succeeded = dq.markTaskSuccess(pubId);
+ }
+ }
+ return succeeded;
+ }
}
* failure timer is active or if no files are found in a directory scan.
*/
public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
- private DeliveryQueueHelper dqh;
- private DestInfo di;
- private Hashtable<String, DeliveryTask> working = new Hashtable<String, DeliveryTask>();
- private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>();
+ private DeliveryQueueHelper deliveryQueueHelper;
+ private DestInfo destinationInfo;
+ private Hashtable<String, DeliveryTask> working = new Hashtable<>();
+ private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
private int todoindex;
private boolean failed;
private long failduration;
private long resumetime;
File dir;
- private Vector<DeliveryTask> todo = new Vector<DeliveryTask>();
+ private Vector<DeliveryTask> todo = new Vector<>();
/**
* Try to cancel a delivery task.
if (!failed) {
failed = true;
if (failduration == 0) {
- failduration = dqh.getInitFailureTimer();
+ if (destinationInfo.isPrivilegedSubscriber()) {
+ failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
+ } else{
+ failduration = deliveryQueueHelper.getInitFailureTimer();
+ }
}
resumetime = System.currentTimeMillis() + failduration;
- long maxdur = dqh.getMaxFailureTimer();
- failduration = (long) (failduration * dqh.getFailureBackoff());
+ long maxdur = deliveryQueueHelper.getMaxFailureTimer();
+ failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
if (failduration > maxdur) {
failduration = maxdur;
}
*/
public synchronized DeliveryTask peekNext() {
long now = System.currentTimeMillis();
- long mindate = now - dqh.getExpirationTimer();
+ long mindate = now - deliveryQueueHelper.getExpirationTimer();
if (failed) {
if (now > resumetime) {
failed = false;
/**
* Create a delivery queue for a given destination info
*/
- public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) {
- this.dqh = dqh;
- this.di = di;
- dir = new File(di.getSpool());
+ public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+ this.deliveryQueueHelper = deliveryQueueHelper;
+ this.destinationInfo = destinationInfo;
+ dir = new File(destinationInfo.getSpool());
dir.mkdirs();
}
/**
* Update the destination info for this delivery queue
*/
- public void config(DestInfo di) {
- this.di = di;
+ public void config(DestInfo destinationInfo) {
+ this.destinationInfo = destinationInfo;
}
/**
* Get the dest info
*/
- public DestInfo getDestInfo() {
- return (di);
+ public DestInfo getDestinationInfo() {
+ return (destinationInfo);
}
/**
* Get the config manager
*/
public DeliveryQueueHelper getConfig() {
- return (dqh);
+ return (deliveryQueueHelper);
}
/**
*/
public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
if (status < 300) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid);
- markSuccess(task);
- } else if (status < 400 && dqh.isFollowRedirects()) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
- if (dqh.handleRedirection(di, location, task.getFileId())) {
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+ if (destinationInfo.isPrivilegedSubscriber()) {
+ markFailWithRetry(task);
+ } else {
+ markSuccess(task);
+ }
+ } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+ if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
markRedirect(task);
} else {
StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
}
- } else if (status < 500) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+ } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
markFailNoRetry(task);
} else {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
markFailWithRetry(task);
}
}
* Delivery failed by reason of an exception
*/
public void reportException(DeliveryTask task, Exception exception) {
- StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString());
- dqh.handleUnreachable(di);
+ StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
+ deliveryQueueHelper.handleUnreachable(destinationInfo);
markFailWithRetry(task);
}
* @return The feed ID
*/
public String getFeedId(String subid) {
- return (dqh.getFeedId(subid));
+ return (deliveryQueueHelper.getFeedId(subid));
}
/**
* Get the URL to deliver a message to given the file ID
*/
public String getDestURL(String fileid) {
- return (dqh.getDestURL(di, fileid));
+ return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
}
/**
*/
public void run() {
DeliveryTask t;
- long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit();
- int filestogo = dqh.getFairFileLimit();
+ long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
+ int filestogo = deliveryQueueHelper.getFairFileLimit();
while ((t = getNext()) != null) {
t.run();
if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
public void resetQueue() {
resumetime = System.currentTimeMillis();
}
+
+ /**
+ * Get task if in queue and mark as success
+ */
+ public boolean markTaskSuccess(String pubId) {
+ DeliveryTask task = working.get(pubId);
+ if (task != null) {
+ markSuccess(task);
+ return true;
+ }
+ task = retry.get(pubId);
+ if (task != null) {
+ retry.remove(pubId);
+ task.clean();
+ resumetime = 0;
+ failduration = 0;
+ return true;
+ }
+ return false;
+ }
}
/**
* Get the timeout (milliseconds) before retrying after an initial delivery failure
*/
- public long getInitFailureTimer();
+ long getInitFailureTimer();
+
+ /**
+ * Get the timeout before retrying after delivery and wait for file processing
+ */
+ long getWaitForFileProcessFailureTimer();
/**
* Get the ratio between timeouts on consecutive delivery attempts
*/
- public double getFailureBackoff();
+ double getFailureBackoff();
/**
* Get the maximum timeout (milliseconds) between delivery attempts
*/
- public long getMaxFailureTimer();
+ long getMaxFailureTimer();
/**
* Get the expiration timer (milliseconds) for deliveries
*/
- public long getExpirationTimer();
+ long getExpirationTimer();
/**
* Get the maximum number of file delivery attempts before checking
* if another queue has work to be performed.
*/
- public int getFairFileLimit();
+ int getFairFileLimit();
/**
* Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
*/
- public long getFairTimeLimit();
+ long getFairTimeLimit();
/**
* Get the URL for delivering a file
*
- * @param dest The destination information for the file to be delivered.
+ * @param destinationInfo The destination information for the file to be delivered.
* @param fileid The file id for the file to be delivered.
- * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid).
+ * @return The URL for delivering the file (typically, destinationInfo.getURL() + "/" + fileid).
*/
- public String getDestURL(DestInfo dest, String fileid);
+ String getDestURL(DestInfo destinationInfo, String fileid);
/**
* Forget redirections associated with a subscriber
*
- * @param dest Destination information to forget
+ * @param destinationInfo Destination information to forget
*/
- public void handleUnreachable(DestInfo dest);
+ void handleUnreachable(DestInfo destinationInfo);
/**
* Post redirection for a subscriber
*
- * @param dest Destination information to update
+ * @param destinationInfo Destination information to update
* @param location Location given by subscriber
* @param fileid File ID of request
* @return true if this 3xx response is retryable, otherwise, false.
*/
- public boolean handleRedirection(DestInfo dest, String location, String fileid);
+ boolean handleRedirection(DestInfo destinationInfo, String location, String fileid);
/**
* Should I handle 3xx responses differently than 4xx responses?
*/
- public boolean isFollowRedirects();
+ boolean isFollowRedirects();
/**
* Get the feed ID for a subscription
* @param subid The subscription ID
* @return The feed ID
*/
- public String getFeedId(String subid);
+ String getFeedId(String subid);
}
private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
private static EELFLogger eelflogger = EELFManager.getInstance()
.getLogger(DeliveryTask.class);
- private DeliveryTaskHelper dth;
+ private DeliveryTaskHelper deliveryTaskHelper;
private String pubid;
- private DestInfo di;
+ private DestInfo destInfo;
private String spool;
private File datafile;
private File metafile;
/**
* Create a delivery task for a given delivery queue and pub ID
*
- * @param dth The delivery task helper for the queue this task is in.
+ * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
* @param pubid The publish ID for this file. This is used as
* the base for the file name in the spool directory and is of
* the form <milliseconds since 1970>.<fqdn of initial data router node>
*/
- public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
- this.dth = dth;
+ public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+ this.deliveryTaskHelper = deliveryTaskHelper;
this.pubid = pubid;
- di = dth.getDestInfo();
- subid = di.getSubId();
- feedid = di.getLogData();
- spool = di.getSpool();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ subid = destInfo.getSubId();
+ feedid = destInfo.getLogData();
+ spool = destInfo.getSpool();
String dfn = spool + "/" + pubid;
String mfn = dfn + ".M";
datafile = new File(spool + "/" + pubid);
metafile = new File(mfn);
- boolean monly = di.isMetaDataOnly();
+ boolean monly = destInfo.isMetaDataOnly();
date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
- Vector<String[]> hdrv = new Vector<String[]>();
+ Vector<String[]> hdrv = new Vector<>();
try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
String s = br.readLine();
String v = s.substring(i + 1);
if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
subid = v.replaceAll("[^ ]*/", "");
- feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+ feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
}
if (length == 0 && h.toLowerCase().startsWith("content-")) {
continue;
loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
}
hdrs = hdrv.toArray(new String[hdrv.size()][]);
- url = dth.getDestURL(fileid);
+ url = deliveryTaskHelper.getDestURL(fileid);
}
/**
* Is the object a DeliveryTask with the same publication ID?
public void run() {
attempts++;
try {
- di = dth.getDestInfo();
- boolean expect100 = di.isUsing100();
- boolean monly = di.isMetaDataOnly();
+ destInfo = deliveryTaskHelper.getDestinationInfo();
+ boolean expect100 = destInfo.isUsing100();
+ boolean monly = destInfo.isMetaDataOnly();
length = 0;
if (!"DELETE".equals(method) && !monly) {
length = datafile.length();
}
- url = dth.getDestURL(fileid);
+ url = deliveryTaskHelper.getDestURL(fileid);
URL u = new URL(url);
HttpURLConnection uc = (HttpURLConnection) u.openConnection();
uc.setConnectTimeout(60000);
uc.setInstanceFollowRedirects(false);
uc.setRequestMethod(method);
uc.setRequestProperty("Content-Length", Long.toString(length));
- uc.setRequestProperty("Authorization", di.getAuth());
+ uc.setRequestProperty("Authorization", destInfo.getAuth());
uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
for (String[] nv : hdrs) {
uc.addRequestProperty(nv[0], nv[1]);
try {
os = uc.getOutputStream();
} catch (ProtocolException pe) {
- dth.reportDeliveryExtra(this, -1L);
+ deliveryTaskHelper.reportDeliveryExtra(this, -1L);
// Rcvd error instead of 100-continue
loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
}
}
os.close();
} catch (IOException ioe) {
- dth.reportDeliveryExtra(this, sofar);
+ deliveryTaskHelper.reportDeliveryExtra(this, sofar);
throw ioe;
}
}
}
is.close();
}
- dth.reportStatus(this, rc, xpubid, rmsg);
+ deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
} catch (Exception e) {
loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
- dth.reportException(this, e);
+ deliveryTaskHelper.reportException(this, e);
}
}
* @param task The task that failed
* @param exception The exception that occurred
*/
- public void reportException(DeliveryTask task, Exception exception);
+ void reportException(DeliveryTask task, Exception exception);
/**
* Report that a delivery attempt completed (successfully or unsuccessfully)
* @param xpubid The publish ID from the far end (if any)
* @param location The redirection location for a 3XX response
*/
- public void reportStatus(DeliveryTask task, int status, String xpubid, String location);
+ void reportStatus(DeliveryTask task, int status, String xpubid, String location);
/**
* Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.
* @param task The task that failed
* @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue.
*/
- public void reportDeliveryExtra(DeliveryTask task, long sent);
+ void reportDeliveryExtra(DeliveryTask task, long sent);
/**
* Get the destination information for the delivery queue
*
* @return The destination information
*/
- public DestInfo getDestInfo();
+ DestInfo getDestinationInfo();
/**
* Given a file ID, get the URL to deliver to
* @param fileid The file id
* @return The URL to deliver to
*/
- public String getDestURL(String fileid);
+ String getDestURL(String fileid);
/**
* Get the feed ID for a subscription
* @param subid The subscription ID
* @return The feed iD
*/
- public String getFeedId(String subid);
+ String getFeedId(String subid);
}
private String authentication;
private boolean metaonly;
private boolean use100;
+ private boolean privilegedSubscriber;
/**
* Create a destination information object.
* @param authentication The credentials.
* @param metaonly Is this a metadata only delivery?
* @param use100 Should I use expect 100-continue?
+ * @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file
*/
- public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) {
+ public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) {
this.name = name;
this.spool = spool;
this.subid = subid;
this.authentication = authentication;
this.metaonly = metaonly;
this.use100 = use100;
+ this.privilegedSubscriber = privilegedSubscriber;
+ }
+
+ /**
+ * Create a destination information object.
+ *
+ * @param name n:fqdn or s:subid
+ * @param spool The directory where files are spooled.
+ * @param subscription The subscription.
+ */
+ public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) {
+ this.name = name;
+ this.spool = spool;
+ this.subid = subscription.getSubId();
+ this.logdata = subscription.getFeedId();
+ this.url = subscription.getURL();
+ this.authuser = subscription.getAuthUser();
+ this.authentication = subscription.getCredentials();
+ this.metaonly = subscription.isMetaDataOnly();
+ this.use100 = subscription.isUsing100();
+ this.privilegedSubscriber = subscription.isPrivilegedSubscriber();
}
public boolean equals(Object o) {
public boolean isUsing100() {
return (use100);
}
+
+ /**
+ * Should we wait to receive a file processed acknowledgement before deleting file
+ */
+ public boolean isPrivilegedSubscriber() {
+ return (privilegedSubscriber);
+ }
}
return (10000L);
}
+ public long getWaitForFileProcessFailureTimer() {
+ return (600000L);
+ }
+
public double getFailureBackoff() {
return (2.0);
}
return (86400000);
}
- public String getDestURL(DestInfo dest, String fileid) {
+ public String getDestURL(DestInfo destinationInfo, String fileid) {
return (config.getEventLogUrl());
}
- public void handleUnreachable(DestInfo dest) {
+ public void handleUnreachable(DestInfo destinationInfo) {
}
- public boolean handleRedirection(DestInfo dest, String location, String fileid) {
+ public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) {
return (false);
}
public Uploader() {
dq = new DeliveryQueue(this,
new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false,
- false));
+ false, false));
setDaemon(true);
setName("Log Uploader");
start();
private String credentials;
private boolean metaonly;
private boolean use100;
+ private boolean privilegedSubscriber;
/**
* Construct a subscription configuration entry
* Authorization header.
* @param metaonly Is this a meta data only subscription?
* @param use100 Should we send Expect: 100-continue?
+ * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
*/
public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
- boolean metaonly, boolean use100) {
+ boolean metaonly, boolean use100, boolean privilegedSubscriber) {
this.subid = subid;
this.feedid = feedid;
this.url = url;
this.credentials = credentials;
this.metaonly = metaonly;
this.use100 = use100;
+ this.privilegedSubscriber = privilegedSubscriber;
}
/**
public boolean isUsing100() {
return (use100);
}
+
+ /**
+ * Can we wait to receive a delete file call before deleting file
+ */
+ public boolean isPrivilegedSubscriber() {
+ return (privilegedSubscriber);
+ }
}
/**
Target[] targets;
}
- private Hashtable<String, String> params = new Hashtable<String, String>();
- private Hashtable<String, Feed> feeds = new Hashtable<String, Feed>();
- private Hashtable<String, DestInfo> nodeinfo = new Hashtable<String, DestInfo>();
- private Hashtable<String, DestInfo> subinfo = new Hashtable<String, DestInfo>();
- private Hashtable<String, IsFrom> nodes = new Hashtable<String, IsFrom>();
+ private Hashtable<String, String> params = new Hashtable<>();
+ private Hashtable<String, Feed> feeds = new Hashtable<>();
+ private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
+ private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
+ private Hashtable<String, IsFrom> nodes = new Hashtable<>();
+ private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
private String myname;
private String myauth;
private DestInfo[] alldests;
for (ProvParam p : pd.getParams()) {
params.put(p.getName(), p.getValue());
}
- Vector<DestInfo> div = new Vector<DestInfo>();
+ Vector<DestInfo> destInfos = new Vector<>();
myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
for (ProvNode pn : pd.getNodes()) {
String cn = pn.getCName();
}
String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);
DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn,
- "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true);
+ "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false);
(new File(di.getSpool())).mkdirs();
- div.add(di);
+ destInfos.add(di);
nodeinfo.put(cn, di);
nodes.put(auth, new IsFrom(cn));
}
}
egrtab.put(pfe.getSubId(), pfe.getNode());
}
- Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<String, Vector<SubnetMatcher>>();
+ Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
if (v == null) {
}
v.add(new SubnetMatcher(pfs.getCidr()));
}
- Hashtable<String, StringBuffer> ttab = new Hashtable<String, StringBuffer>();
- HashSet<String> allfeeds = new HashSet<String>();
+ Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
+ HashSet<String> allfeeds = new HashSet<>();
for (ProvFeed pfx : pd.getFeeds()) {
if (pfx.getStatus() == null) {
allfeeds.add(pfx.getId());
}
}
- for (ProvSubscription ps : pd.getSubscriptions()) {
- String sid = ps.getSubId();
- String fid = ps.getFeedId();
- if (!allfeeds.contains(fid)) {
+ for (ProvSubscription provSubscription : pd.getSubscriptions()) {
+ String subId = provSubscription.getSubId();
+ String feedId = provSubscription.getFeedId();
+ if (!allfeeds.contains(feedId)) {
continue;
}
- if (subinfo.get(sid) != null) {
+ if (subinfo.get(subId) != null) {
continue;
}
int sididx = 999;
try {
- sididx = Integer.parseInt(sid);
+ sididx = Integer.parseInt(subId);
sididx -= sididx % 100;
} catch (Exception e) {
}
- String siddir = sididx + "/" + sid;
- DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(),
- ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100());
- (new File(di.getSpool())).mkdirs();
- div.add(di);
- subinfo.put(sid, di);
- String egr = egrtab.get(sid);
+ String subscriptionDirectory = sididx + "/" + subId;
+ DestInfo destinationInfo = new DestInfo("s:" + subId,
+ spooldir + "/s/" + subscriptionDirectory, provSubscription);
+ (new File(destinationInfo.getSpool())).mkdirs();
+ destInfos.add(destinationInfo);
+ provSubscriptions.put(subId, provSubscription);
+ subinfo.put(subId, destinationInfo);
+ String egr = egrtab.get(subId);
if (egr != null) {
- sid = pf.getPath(egr) + sid;
+ subId = pf.getPath(egr) + subId;
}
- StringBuffer sb = ttab.get(fid);
+ StringBuffer sb = feedTargets.get(feedId);
if (sb == null) {
sb = new StringBuffer();
- ttab.put(fid, sb);
+ feedTargets.put(feedId, sb);
}
- sb.append(' ').append(sid);
+ sb.append(' ').append(subId);
}
- alldests = div.toArray(new DestInfo[div.size()]);
+ alldests = destInfos.toArray(new DestInfo[destInfos.size()]);
for (ProvFeed pfx : pd.getFeeds()) {
String fid = pfx.getId();
Feed f = feeds.get(fid);
} else {
f.redirections = v2.toArray(new Redirection[v2.size()]);
}
- StringBuffer sb = ttab.get(fid);
+ StringBuffer sb = feedTargets.get(fid);
if (sb == null) {
f.targets = new Target[0];
} else {
return ("Publisher not permitted for this feed");
}
+ /**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested.
+ */
+ public boolean isDeletePermitted(String subId) {
+ ProvSubscription provSubscription = provSubscriptions.get(subId);
+ return provSubscription.isPrivilegedSubscriber();
+ }
+
/**
* Get authenticated user
*/
private Timer timer = new Timer("Node Configuration Timer", true);
private long maxfailuretimer;
private long initfailuretimer;
+ private long waitForFileProcessFailureTimer;
private long expirationtimer;
private double failurebackoff;
private long fairtimelimit;
followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
initfailuretimer = 10000;
+ waitForFileProcessFailureTimer = 600000;
maxfailuretimer = 3600000;
expirationtimer = 86400000;
failurebackoff = 2.0;
initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
}
+ try {
+ waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
+ } catch (Exception e) {
+ }
try {
maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
} catch (Exception e) {
return (config.isPublishPermitted(feedid, credentials, ip));
}
+ /**
+ * Check whether delete file is allowed.
+ *
+ * @param subId The ID of the subscription being requested
+ * @return True if the delete file is permitted for the subscriber.
+ */
+ public boolean isDeletePermitted(String subId) {
+ return (config.isDeletePermitted(subId));
+ }
+
/**
* Check who the user is given the feed ID and the offered credentials.
*
/**
* Get the URL to deliver a message to.
*
- * @param destinfo The destination information
+ * @param destinationInfo The destination information
* @param fileid The file ID
* @return The URL to deliver to
*/
- public String getDestURL(DestInfo destinfo, String fileid) {
- String subid = destinfo.getSubId();
- String purl = destinfo.getURL();
+ public String getDestURL(DestInfo destinationInfo, String fileid) {
+ String subid = destinationInfo.getSubId();
+ String purl = destinationInfo.getURL();
if (followredirects && subid != null) {
purl = rdmgr.lookup(subid, purl);
}
/**
* Set up redirection on receipt of a 3XX from a target URL
*/
- public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {
+ public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
fileid = "/" + fileid;
- String subid = destinfo.getSubId();
- String purl = destinfo.getURL();
+ String subid = destinationInfo.getSubId();
+ String purl = destinationInfo.getURL();
if (followredirects && subid != null && redirto.endsWith(fileid)) {
redirto = redirto.substring(0, redirto.length() - fileid.length());
if (!redirto.equals(purl)) {
/**
* Handle unreachable target URL
*/
- public void handleUnreachable(DestInfo destinfo) {
- String subid = destinfo.getSubId();
+ public void handleUnreachable(DestInfo destinationInfo) {
+ String subid = destinationInfo.getSubId();
if (followredirects && subid != null) {
rdmgr.forget(subid);
}
return (initfailuretimer);
}
+ /**
+ * Get the timeout before retrying after delivery and wait for file processing
+ */
+ public long getWaitForFileProcessFailureTimer() {
+ return (waitForFileProcessFailureTimer);
+ }
+
/**
* Get the maximum timeout between delivery attempts
*/
ctxt = new ServletContextHandler(0);
ctxt.setContextPath("/");
server.setHandler(ctxt);
- ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*");
+ ctxt.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*");
nodeMainLogger.info("NODE0005 Data Router Node Activating Service");
server.start();
server.join();
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.regex.Pattern;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
import org.slf4j.MDC;
//Adding EELF Logger Rally:US664892
private static EELFLogger eelflogger = EELFManager.getInstance()
.getLogger(NodeServlet.class);
+ private Delivery delivery;
static {
final String ws = "\\s*";
MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);
}
+ NodeServlet(Delivery delivery) {
+ this.delivery = delivery;
+ }
+
/**
* Get the NodeConfigurationManager
*/
} catch (IOException ioe) {
logger.error("IOException" + ioe.getMessage());
eelflogger.info(EelfMsgs.EXIT);
- } catch (ServletException se) {
- logger.error("ServletException" + se.getMessage());
- eelflogger.info(EelfMsgs.EXIT);
}
}
/**
* Handle all DELETE requests
*/
- protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doDelete(HttpServletRequest req, HttpServletResponse resp) {
NodeUtils.setIpAndFqdnForEelf("doDelete");
NodeUtils.setRequestIdAndInvocationId(req);
eelflogger.info(EelfMsgs.ENTRY);
try {
common(req, resp, false);
} catch (IOException ioe) {
- logger.error("IOException" + ioe.getMessage());
- eelflogger.info(EelfMsgs.EXIT);
- } catch (ServletException se) {
- logger.error("ServletException" + se.getMessage());
+ logger.error("IOException " + ioe.getMessage());
eelflogger.info(EelfMsgs.EXIT);
}
-
}
- private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput)
- throws ServletException, IOException {
- if (down(resp)) {
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
- if (!req.isSecure()) {
- logger.info(
- "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
- .getRemoteAddr());
- resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
- String fileid = req.getPathInfo();
- if (fileid == null) {
- logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
- .getRemoteAddr());
- resp.sendError(HttpServletResponse.SC_NOT_FOUND,
- "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
- eelflogger.info(EelfMsgs.EXIT);
- return;
- }
+ private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException {
+ String fileid = getFileId(req, resp);
+ if (fileid == null) return;
String feedid = null;
String user = null;
+ String ip = req.getRemoteAddr();
+ String lip = req.getLocalAddr();
+ String pubid = null;
+ String xpubid = null;
+ String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
+ Target[] targets = null;
+ if (fileid.startsWith("/delete/")) {
+ deleteFile(req, resp, fileid, pubid);
+ return;
+ }
String credentials = req.getHeader("Authorization");
if (credentials == null) {
logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req
eelflogger.info(EelfMsgs.EXIT);
return;
}
- String ip = req.getRemoteAddr();
- String lip = req.getLocalAddr();
- String pubid = null;
- String xpubid = null;
- String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
- Target[] targets = null;
if (fileid.startsWith("/publish/")) {
fileid = fileid.substring(9);
int i = fileid.indexOf('/');
mx.append(req.getMethod()).append('\t').append(fileid).append('\n');
Enumeration hnames = req.getHeaderNames();
String ctype = null;
- Boolean hasRequestIdHeader = false;
- Boolean hasInvocationIdHeader = false;
+ boolean hasRequestIdHeader = false;
+ boolean hasInvocationIdHeader = false;
while (hnames.hasMoreElements()) {
String hn = (String) hnames.nextElement();
String hnlc = hn.toLowerCase();
}
}
+ private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) {
+ try {
+ fileid = fileid.substring(8);
+ int i = fileid.indexOf('/');
+ if (i == -1 || i == fileid.length() - 1) {
+ logger.info("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <subId>/<pubId>.");
+ eelflogger.info(EelfMsgs.EXIT);
+ return;
+ }
+ String subscriptionId = fileid.substring(0, i);
+ int subId = Integer.parseInt(subscriptionId);
+ pubid = fileid.substring(i + 1);
+ String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName() + ".";
+ int subIdDir = subId - (subId % 100);
+ if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) {
+ return;
+ }
+ boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid);
+ if (result) {
+ logger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName());
+ resp.setStatus(HttpServletResponse.SC_OK);
+ eelflogger.info(EelfMsgs.EXIT);
+ } else {
+ logger.error("NODE0116 " + errorMessage);
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "File not found on server.");
+ eelflogger.info(EelfMsgs.EXIT);
+ }
+ } catch (IOException ioe) {
+ logger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+ + config.getMyName() + ". Error: " + ioe.getMessage());
+ eelflogger.info(EelfMsgs.EXIT);
+ }
+ }
+
+ @Nullable
+ private String getFileId(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ if (down(resp)) {
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ if (!req.isSecure()) {
+ logger.info(
+ "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ String fileid = req.getPathInfo();
+ if (fileid == null) {
+ logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+ .getRemoteAddr());
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");
+ eelflogger.info(EelfMsgs.EXIT);
+ return null;
+ }
+ return fileid;
+ }
+
+ private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException {
+ try {
+ boolean deletePermitted = config.isDeletePermitted(subscriptionId);
+ if (!deletePermitted) {
+ logger.error("NODE0113 " + errorMessage + " Error: Subscription "
+ + subscriptionId + " is not a privileged subscription");
+ resp.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+ eelflogger.info(EelfMsgs.EXIT);
+ return false;
+ }
+ } catch (NullPointerException npe) {
+ logger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId + " does not exist");
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND);
+ eelflogger.info(EelfMsgs.EXIT);
+ return false;
+ }
+ return true;
+ }
+
private int getIdFromPath(HttpServletRequest req) {
String path = req.getPathInfo();
if (path == null || path.length() < 2) {
String password = gvas(jdel, "password");
boolean monly = jsub.getBoolean("metadataOnly");
boolean use100 = jdel.getBoolean("use100");
- psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100));
+ boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
+ psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber));
}
}
JSONObject jparams = jcfg.optJSONObject("parameters");
private DestInfo[] createDestInfoObjects() {
DestInfo[] destInfos = new DestInfo[1];
- DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true);
+ DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true, false);
destInfos[0] = destInfo;
return destInfos;
}
delivery.put("password", "password1");
delivery.put("use100", true);
subscription.put("delivery", delivery);
+ subscription.put("privilegedSubscriber", false);
subscriptions.put(subscription);
provData.put("subscriptions", subscriptions);
}
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
import java.util.*;
import static org.hamcrest.Matchers.notNullValue;
public class NodeServletTest {
private NodeServlet nodeServlet;
+ private Delivery delivery;
@Mock
private HttpServletRequest request;
ListAppender<ILoggingEvent> listAppender;
@Before
- public void setUp() throws Exception{
+ public void setUp() throws Exception {
listAppender = setTestLogger();
- nodeServlet = new NodeServlet();
setBehalfHeader("Stub_Value");
when(request.getPathInfo()).thenReturn("2");
when(request.isSecure()).thenReturn(true);
+ createFilesAndDirectories();
setUpConfig();
setUpNodeMainDelivery();
+ delivery = mock(Delivery.class);
+ when(delivery.markTaskSuccess("spool/s/0/1", "dmaap-dr-node.1234567")).thenReturn(true);
+ nodeServlet = new NodeServlet(delivery);
when(request.getHeader("Authorization")).thenReturn("User1");
when(request.getHeader("X-DMAAP-DR-PUBLISH-ID")).thenReturn("User1");
}
+ @AfterClass
+ public static void tearDown() {
+ deleteCreatedDirectories();
+ }
+
@Test
public void Given_Request_Is_HTTP_GET_And_Config_Is_Down_Then_Service_Unavailable_Response_Is_Generated() throws Exception {
setNodeConfigManagerIsConfiguredToReturnFalse();
}
@Test
- public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_Internal_FetchProv_Then_No_Content_Response_Is_Generated() throws Exception {
+ public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_Internal_FetchProv_Then_No_Content_Response_Is_Generated() {
when(request.getPathInfo()).thenReturn("/internal/fetchProv");
nodeServlet.doGet(request, response);
verify(response).setStatus(eq(HttpServletResponse.SC_NO_CONTENT));
}
@Test
- public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_ResetSubscription_Then_No_Content_Response_Is_Generated() throws Exception {
+ public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_ResetSubscription_Then_No_Content_Response_Is_Generated() {
when(request.getPathInfo()).thenReturn("/internal/resetSubscription/1");
nodeServlet.doGet(request, response);
verify(response).setStatus(eq(HttpServletResponse.SC_NO_CONTENT));
verifyEnteringExitCalled(listAppender);
}
+ @Test
+ public void Given_Request_Is_HTTP_DELETE_File_With_Invalid_Endpoint_Then_Not_Found_Response_Is_Generated() throws Exception {
+ when(request.getPathInfo()).thenReturn("/delete/1");
+ nodeServlet.doDelete(request, response);
+ verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND), argThat(notNullValue(String.class)));
+ verifyEnteringExitCalled(listAppender);
+ }
+
+ @Test
+ public void Given_Request_Is_HTTP_DELETE_File_And_Is_Not_Privileged_Subscription_Then_Not_Found_Response_Is_Generated() throws Exception {
+ when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567");
+ setUpConfigToReturnUnprivilegedSubscriber();
+ nodeServlet.doDelete(request, response);
+ verify(response).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED));
+ verifyEnteringExitCalled(listAppender);
+ }
+
+ @Test
+ public void Given_Request_Is_HTTP_DELETE_File_And_Subscription_Does_Not_Exist_Then_Not_Found_Response_Is_Generated() throws Exception {
+ when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567");
+ setUpConfigToReturnNullOnIsDeletePermitted();
+ nodeServlet.doDelete(request, response);
+ verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND));
+ verifyEnteringExitCalled(listAppender);
+ }
+
+ @Test
+ public void Given_Request_Is_HTTP_DELETE_File_Then_Request_Succeeds() throws Exception {
+ when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567");
+ createFilesAndDirectories();
+ nodeServlet.doDelete(request, response);
+ verify(response).setStatus(eq(HttpServletResponse.SC_OK));
+ verifyEnteringExitCalled(listAppender);
+ }
+
+ @Test
+ public void Given_Request_Is_HTTP_DELETE_File_And_File_Does_Not_Exist_Then_Not_Found_Response_Is_Generated() throws IOException {
+ when(request.getPathInfo()).thenReturn("/delete/1/nonExistingFile");
+ nodeServlet.doDelete(request, response);
+ verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND), argThat(notNullValue(String.class)));
+ verifyEnteringExitCalled(listAppender);
+ }
private void setBehalfHeader(String headerValue) {
when(request.getHeader("X-DMAAP-DR-ON-BEHALF-OF")).thenReturn(headerValue);
assertEquals(3, listAppender.list.size());
}
- private void setUpConfig() throws IllegalAccessException{
+ private void setUpConfig() throws IllegalAccessException {
NodeConfigManager config = mock(NodeConfigManager.class);
PowerMockito.mockStatic(NodeConfigManager.class);
when(config.isShutdown()).thenReturn(false);
when(config.isConfigured()).thenReturn(true);
- when(config.getSpoolDir()).thenReturn("spool/dir");
+ when(config.getSpoolDir()).thenReturn("spool/f");
+ when(config.getSpoolBase()).thenReturn("spool");
when(config.getLogDir()).thenReturn("log/dir");
when(config.getPublishId()).thenReturn("User1");
when(config.isAnotherNode(anyString(), anyString())).thenReturn(true);
when(config.getEventLogInterval()).thenReturn("40");
+ when(config.isDeletePermitted("1")).thenReturn(true);
+ when(config.getAllDests()).thenReturn(new DestInfo[0]);
FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true);
FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true);
PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config);
}
+ private void setUpConfigToReturnUnprivilegedSubscriber() throws IllegalAccessException {
+ NodeConfigManager config = mock(NodeConfigManager.class);
+ PowerMockito.mockStatic(NodeConfigManager.class);
+ when(config.isShutdown()).thenReturn(false);
+ when(config.isConfigured()).thenReturn(true);
+ when(config.isDeletePermitted("1")).thenReturn(false);
+ FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true);
+ FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true);
+ PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config);
+ }
+
+ private void setUpConfigToReturnNullOnIsDeletePermitted() throws IllegalAccessException {
+ NodeConfigManager config = mock(NodeConfigManager.class);
+ PowerMockito.mockStatic(NodeConfigManager.class);
+ when(config.isShutdown()).thenReturn(false);
+ when(config.isConfigured()).thenReturn(true);
+ when(config.isDeletePermitted("1")).thenThrow(new NullPointerException());
+ FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true);
+ FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true);
+ PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config);
+ }
private void setUpNodeMainDelivery() throws IllegalAccessException{
Delivery delivery = mock(Delivery.class);
when(request.getHeaders("X-DMAAP-DR-ON-BEHALF-OF")).thenReturn(behalfHeader);
when(request.getHeaders("X-DMAAP-DR-META")).thenReturn(metaDataHeader);
}
+
+ private void createFilesAndDirectories() throws IOException {
+ File nodeDir = new File("spool/n/172.0.0.1");
+ File spoolDir = new File("spool/s/0/1");
+ File dataFile = new File("spool/s/0/1/dmaap-dr-node.1234567");
+ File metaDataFile = new File("spool/s/0/1/dmaap-dr-node.1234567.M");
+ nodeDir.mkdirs();
+ spoolDir.mkdirs();
+ dataFile.createNewFile();
+ metaDataFile.createNewFile();
+ }
+
+ private static void deleteCreatedDirectories() {
+ File spoolDir = new File("spool");
+ delete(spoolDir);
+ }
+
+ private static void delete(File file) {
+ if (file.isDirectory()) {
+ for (File f: file.listFiles()) {
+ delete(f);
+ }
+ }
+ if (!file.delete()) {
+ System.out.println("Failed to delete: " + file);
+ }
+ }
}
private boolean suspended;\r
private Date lastMod;\r
private Date createdDate;\r
+ private boolean privilegedSubscriber;\r
\r
public static Subscription getSubscriptionMatching(Subscription sub) {\r
SubDelivery deli = sub.getDelivery();\r
this.suspended = false;\r
this.lastMod = new Date();\r
this.createdDate = new Date();\r
+ this.privilegedSubscriber = false;\r
}\r
\r
public Subscription(ResultSet rs) throws SQLException {\r
this.suspended = rs.getBoolean("SUSPENDED");\r
this.lastMod = rs.getDate("LAST_MOD");\r
this.createdDate = rs.getDate("CREATED_DATE");\r
+ this.privilegedSubscriber = rs.getBoolean("PRIVILEGED_SUBSCRIBER");\r
}\r
\r
public Subscription(JSONObject jo) throws InvalidObjectException {\r
\r
this.metadataOnly = jo.getBoolean("metadataOnly");\r
this.suspended = jo.optBoolean("suspend", false);\r
-\r
+ this.privilegedSubscriber = jo.optBoolean("privilegedSubscriber", false);\r
this.subscriber = jo.optString("subscriber", "");\r
JSONObject jol = jo.optJSONObject("links");\r
this.links = (jol == null) ? (new SubLinks()) : (new SubLinks(jol));\r
this.suspended = suspended;\r
}\r
\r
+ public boolean isPrivilegedSubscriber() {\r
+ return privilegedSubscriber;\r
+ }\r
+\r
+ public void setPrivilegedSubscriber(boolean privilegedSubscriber) {\r
+ this.privilegedSubscriber = privilegedSubscriber;\r
+ }\r
+\r
public String getSubscriber() {\r
return subscriber;\r
}\r
jo.put("suspend", suspended);\r
jo.put(LAST_MOD_KEY, lastMod.getTime());\r
jo.put(CREATED_DATE, createdDate.getTime());\r
+ jo.put("privilegedSubscriber", privilegedSubscriber);\r
return jo;\r
}\r
\r
}\r
\r
// Create the SUBSCRIPTIONS row\r
- String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
+ String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
ps = c.prepareStatement(sql, new String[]{SUBID_COL});\r
ps.setInt(1, subid);\r
ps.setInt(2, feedid);\r
ps.setString(8, getSubscriber());\r
ps.setBoolean(9, isSuspended());\r
ps.setInt(10, groupid); //New field is added - Groups feature Rally:US708115 - 1610\r
+ ps.setBoolean(11, isPrivilegedSubscriber());\r
ps.execute();\r
ps.close();\r
// Update the row to set the URLs\r
boolean rv = true;\r
PreparedStatement ps = null;\r
try {\r
- String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ? where SUBID = ?";\r
+ String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ?, PRIVILEGED_SUBSCRIBER = ? where SUBID = ?";\r
ps = c.prepareStatement(sql);\r
ps.setString(1, delivery.getUrl());\r
ps.setString(2, delivery.getUser());\r
ps.setInt(5, isMetadataOnly() ? 1 : 0);\r
ps.setInt(6, suspended ? 1 : 0);\r
ps.setInt(7, groupid); //New field is added - Groups feature Rally:US708115 - 1610\r
- ps.setInt(8, subid);\r
+ ps.setInt(8, privilegedSubscriber ? 1 : 0);\r
+ ps.setInt(9, subid);\r
ps.executeUpdate();\r
} catch (SQLException e) {\r
rv = false;\r
);
CREATE TABLE SUBSCRIPTIONS (
- SUBID INT UNSIGNED NOT NULL PRIMARY KEY,
- FEEDID INT UNSIGNED NOT NULL,
- GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0,
- DELIVERY_URL VARCHAR(256),
- DELIVERY_USER VARCHAR(20),
- DELIVERY_PASSWORD VARCHAR(32),
- DELIVERY_USE100 BOOLEAN DEFAULT FALSE,
- METADATA_ONLY BOOLEAN DEFAULT FALSE,
- SUBSCRIBER VARCHAR(8) NOT NULL,
- SELF_LINK VARCHAR(256),
- LOG_LINK VARCHAR(256),
- LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- SUSPENDED BOOLEAN DEFAULT FALSE,
- CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ SUBID INT UNSIGNED NOT NULL PRIMARY KEY,
+ FEEDID INT UNSIGNED NOT NULL,
+ GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0,
+ DELIVERY_URL VARCHAR(256),
+ DELIVERY_USER VARCHAR(20),
+ DELIVERY_PASSWORD VARCHAR(32),
+ DELIVERY_USE100 BOOLEAN DEFAULT FALSE,
+ METADATA_ONLY BOOLEAN DEFAULT FALSE,
+ SUBSCRIBER VARCHAR(8) NOT NULL,
+ SELF_LINK VARCHAR(256),
+ LOG_LINK VARCHAR(256),
+ LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ SUSPENDED BOOLEAN DEFAULT FALSE,
+ PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE,
+ CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
('DELIVERY_INIT_RETRY_INTERVAL', '10'),
('DELIVERY_MAX_AGE', '86400'),
('DELIVERY_MAX_RETRY_INTERVAL', '3600'),
+ ('DELIVERY_FILE_PROCESS_INTERVAL', '600'),
('DELIVERY_RETRY_RATIO', '2'),
('LOGROLL_INTERVAL', '30'),
('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'),
('PROV_MAXSUB_COUNT', '100000'),
('PROV_REQUIRE_CERT', 'false'),
('PROV_REQUIRE_SECURE', 'false'),
- ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE')
+ ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL')
;
******************************************************************************/
package org.onap.dmaap.datarouter.provisioning;
-import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.onap.dmaap.datarouter.provisioning.beans.Updateable;
import org.onap.dmaap.datarouter.provisioning.utils.DB;
import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.LoggerFactory;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import java.io.File;
import java.io.FileNotFoundException;
-import java.io.PrintWriter;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
import static org.onap.dmaap.datarouter.provisioning.BaseServlet.BEHALF_HEADER;
jo.put("version", "2.0");
jo.put("metadataOnly", true);
jo.put("suspend", true);
+ jo.put("privilegedSubscriber", true);
jo.put("delivery", JSObject);
jo.put("subscriber", "differentSubscriber");
jo.put("sync", true);
jo.put("version", "2.0");
jo.put("metadataOnly", true);
jo.put("suspend", true);
+ jo.put("privilegedSubscriber", true);
jo.put("delivery", JSObject);
jo.put("sync", true);
return jo;
jo.put("version", "2.0");
jo.put("metadataOnly", true);
jo.put("suspend", true);
+ jo.put("privilegedSubscriber", true);
jo.put("delivery", JSObject);
jo.put("sync", true);
jo.put("changeowner", true);
subscription.setGroupid(1);
subscription.setMetadataOnly(false);
subscription.setSuspended(false);
+ subscription.setPrivilegedSubscriber(false);
subscription.doInsert(db.getConnection());
}
subscription.setGroupid(1);
subscription.setMetadataOnly(false);
subscription.setSuspended(false);
+ subscription.setPrivilegedSubscriber(false);
subscription.changeOwnerShip();
subscription.doUpdate(db.getConnection());
}
subscription.setMetadataOnly(false);
subscription.setSubscriber(subscriber);
subscription.setSuspended(false);
+ subscription.setPrivilegedSubscriber(false);
subscription.setLinks(subLinks);
Assert.assertEquals(2, subscription.getGroupid());
Assert.assertEquals(subLinks, subscription.getLinks());
Assert.assertFalse(subscription.isMetadataOnly());
Assert.assertFalse(subscription.isSuspended());
+ Assert.assertFalse(subscription.isPrivilegedSubscriber());
}
}
\ No newline at end of file
);
CREATE TABLE SUBSCRIPTIONS (
- SUBID INT UNSIGNED NOT NULL PRIMARY KEY,
- FEEDID INT UNSIGNED NOT NULL,
- GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0,
- DELIVERY_URL VARCHAR(256),
- DELIVERY_USER VARCHAR(20),
- DELIVERY_PASSWORD VARCHAR(32),
- DELIVERY_USE100 BOOLEAN DEFAULT FALSE,
- METADATA_ONLY BOOLEAN DEFAULT FALSE,
- SUBSCRIBER VARCHAR(8) NOT NULL,
- SELF_LINK VARCHAR(256),
- LOG_LINK VARCHAR(256),
- LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- SUSPENDED BOOLEAN DEFAULT FALSE,
- CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ SUBID INT UNSIGNED NOT NULL PRIMARY KEY,
+ FEEDID INT UNSIGNED NOT NULL,
+ GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0,
+ DELIVERY_URL VARCHAR(256),
+ DELIVERY_USER VARCHAR(20),
+ DELIVERY_PASSWORD VARCHAR(32),
+ DELIVERY_USE100 BOOLEAN DEFAULT FALSE,
+ METADATA_ONLY BOOLEAN DEFAULT FALSE,
+ SUBSCRIBER VARCHAR(8) NOT NULL,
+ SELF_LINK VARCHAR(256),
+ LOG_LINK VARCHAR(256),
+ LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ SUSPENDED BOOLEAN DEFAULT FALSE,
+ PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE,
+ CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
('DELIVERY_INIT_RETRY_INTERVAL', '10'),
('DELIVERY_MAX_AGE', '86400'),
('DELIVERY_MAX_RETRY_INTERVAL', '3600'),
+ ('DELIVERY_FILE_PROCESS_INTERVAL', '600'),
('DELIVERY_RETRY_RATIO', '2'),
('LOGROLL_INTERVAL', '300'),
('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'),
('PROV_MAXSUB_COUNT', '100000'),
('PROV_REQUIRE_CERT', 'false'),
('PROV_REQUIRE_SECURE', 'true'),
- ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE')
+ ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL')
;
INSERT INTO GROUPS(GROUPID, AUTHID, NAME, DESCRIPTION, CLASSIFICATION, MEMBERS)
VALUES (1, 'Basic dXNlcjE6cGFzc3dvcmQx', 'Group1', 'First Group for testing', 'Class1', 'Member1');
-INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID)
-VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1);
+INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER)
+VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1, false);
INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, SUBSCRIBER, SELF_LINK, LOG_LINK)
VALUES (23, 1, 'http://delivery_url', 'user1', 'somepassword', 'sub123', 'selflink', 'loglink');