From c50374709585766e887f349a139de0a6595c1ca1 Mon Sep 17 00:00:00 2001 From: econwar Date: Thu, 14 Feb 2019 09:37:44 +0000 Subject: [PATCH] Add optional API for PM Mapper Added new field to Subscriber class to keep files after published Added new Delete endpoint so that file can then be deleted Change-Id: Id72da67689a7ceda8ddd4997cd6349b981cb1cdb Issue-ID: DMAAP-981 Signed-off-by: econwar --- .../src/main/resources/database/sql_init_01.sql | 32 ++--- .../org/onap/dmaap/datarouter/node/Delivery.java | 14 ++ .../onap/dmaap/datarouter/node/DeliveryQueue.java | 92 ++++++++----- .../dmaap/datarouter/node/DeliveryQueueHelper.java | 35 ++--- .../onap/dmaap/datarouter/node/DeliveryTask.java | 44 +++--- .../dmaap/datarouter/node/DeliveryTaskHelper.java | 12 +- .../org/onap/dmaap/datarouter/node/DestInfo.java | 32 ++++- .../org/onap/dmaap/datarouter/node/LogManager.java | 12 +- .../org/onap/dmaap/datarouter/node/NodeConfig.java | 84 +++++++----- .../dmaap/datarouter/node/NodeConfigManager.java | 41 ++++-- .../org/onap/dmaap/datarouter/node/NodeMain.java | 2 +- .../onap/dmaap/datarouter/node/NodeServlet.java | 148 +++++++++++++++------ .../org/onap/dmaap/datarouter/node/ProvData.java | 3 +- .../onap/dmaap/datarouter/node/DeliveryTest.java | 2 +- .../onap/dmaap/datarouter/node/NodeConfigTest.java | 1 + .../dmaap/datarouter/node/NodeServletTest.java | 116 +++++++++++++++- .../provisioning/beans/Subscription.java | 22 ++- .../src/main/resources/misc/sql_init_01.sql | 32 ++--- .../provisioning/SubscriptionServletTest.java | 10 +- .../provisioning/beans/SubscriptionTest.java | 2 + datarouter-prov/src/test/resources/create.sql | 36 ++--- 21 files changed, 546 insertions(+), 226 deletions(-) diff --git a/datarouter-docker-compose/src/main/resources/database/sql_init_01.sql b/datarouter-docker-compose/src/main/resources/database/sql_init_01.sql index 01258320..60b638a0 100644 --- a/datarouter-docker-compose/src/main/resources/database/sql_init_01.sql +++ b/datarouter-docker-compose/src/main/resources/database/sql_init_01.sql @@ -31,20 +31,21 @@ CREATE TABLE FEED_ENDPOINT_ADDRS ( ); CREATE TABLE SUBSCRIPTIONS ( - SUBID INT UNSIGNED NOT NULL PRIMARY KEY, - FEEDID INT UNSIGNED NOT NULL, - GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0, - DELIVERY_URL VARCHAR(256), - DELIVERY_USER VARCHAR(20), - DELIVERY_PASSWORD VARCHAR(32), - DELIVERY_USE100 BOOLEAN DEFAULT FALSE, - METADATA_ONLY BOOLEAN DEFAULT FALSE, - SUBSCRIBER VARCHAR(8) NOT NULL, - SELF_LINK VARCHAR(256), - LOG_LINK VARCHAR(256), - LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - SUSPENDED BOOLEAN DEFAULT FALSE, - CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP + SUBID INT UNSIGNED NOT NULL PRIMARY KEY, + FEEDID INT UNSIGNED NOT NULL, + GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0, + DELIVERY_URL VARCHAR(256), + DELIVERY_USER VARCHAR(20), + DELIVERY_PASSWORD VARCHAR(32), + DELIVERY_USE100 BOOLEAN DEFAULT FALSE, + METADATA_ONLY BOOLEAN DEFAULT FALSE, + SUBSCRIBER VARCHAR(8) NOT NULL, + SELF_LINK VARCHAR(256), + LOG_LINK VARCHAR(256), + LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + SUSPENDED BOOLEAN DEFAULT FALSE, + PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE, + CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); @@ -134,6 +135,7 @@ INSERT INTO PARAMETERS VALUES ('DELIVERY_INIT_RETRY_INTERVAL', '10'), ('DELIVERY_MAX_AGE', '86400'), ('DELIVERY_MAX_RETRY_INTERVAL', '3600'), + ('DELIVERY_FILE_PROCESS_INTERVAL', '600'), ('DELIVERY_RETRY_RATIO', '2'), ('LOGROLL_INTERVAL', '30'), ('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'), @@ -142,5 +144,5 @@ INSERT INTO PARAMETERS VALUES ('PROV_MAXSUB_COUNT', '100000'), ('PROV_REQUIRE_CERT', 'false'), ('PROV_REQUIRE_SECURE', 'false'), - ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE') + ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL') ; diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java index ae4f13bf..d2600d23 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java @@ -267,4 +267,18 @@ public class Delivery { } } } + + /** + * Mark the task in spool a success + */ + public synchronized boolean markTaskSuccess(String spool, String pubId) { + boolean succeeded = false; + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + succeeded = dq.markTaskSuccess(pubId); + } + } + return succeeded; + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java index ad746255..abdfa718 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java @@ -64,16 +64,16 @@ import java.util.*; * failure timer is active or if no files are found in a directory scan. */ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { - private DeliveryQueueHelper dqh; - private DestInfo di; - private Hashtable working = new Hashtable(); - private Hashtable retry = new Hashtable(); + private DeliveryQueueHelper deliveryQueueHelper; + private DestInfo destinationInfo; + private Hashtable working = new Hashtable<>(); + private Hashtable retry = new Hashtable<>(); private int todoindex; private boolean failed; private long failduration; private long resumetime; File dir; - private Vector todo = new Vector(); + private Vector todo = new Vector<>(); /** * Try to cancel a delivery task. @@ -139,11 +139,15 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { if (!failed) { failed = true; if (failduration == 0) { - failduration = dqh.getInitFailureTimer(); + if (destinationInfo.isPrivilegedSubscriber()) { + failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer(); + } else{ + failduration = deliveryQueueHelper.getInitFailureTimer(); + } } resumetime = System.currentTimeMillis() + failduration; - long maxdur = dqh.getMaxFailureTimer(); - failduration = (long) (failduration * dqh.getFailureBackoff()); + long maxdur = deliveryQueueHelper.getMaxFailureTimer(); + failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff()); if (failduration > maxdur) { failduration = maxdur; } @@ -184,7 +188,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public synchronized DeliveryTask peekNext() { long now = System.currentTimeMillis(); - long mindate = now - dqh.getExpirationTimer(); + long mindate = now - deliveryQueueHelper.getExpirationTimer(); if (failed) { if (now > resumetime) { failed = false; @@ -246,32 +250,32 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Create a delivery queue for a given destination info */ - public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) { - this.dqh = dqh; - this.di = di; - dir = new File(di.getSpool()); + public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { + this.deliveryQueueHelper = deliveryQueueHelper; + this.destinationInfo = destinationInfo; + dir = new File(destinationInfo.getSpool()); dir.mkdirs(); } /** * Update the destination info for this delivery queue */ - public void config(DestInfo di) { - this.di = di; + public void config(DestInfo destinationInfo) { + this.destinationInfo = destinationInfo; } /** * Get the dest info */ - public DestInfo getDestInfo() { - return (di); + public DestInfo getDestinationInfo() { + return (destinationInfo); } /** * Get the config manager */ public DeliveryQueueHelper getConfig() { - return (dqh); + return (deliveryQueueHelper); } /** @@ -294,22 +298,26 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { if (status < 300) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid); - markSuccess(task); - } else if (status < 400 && dqh.isFollowRedirects()) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); - if (dqh.handleRedirection(di, location, task.getFileId())) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); + if (destinationInfo.isPrivilegedSubscriber()) { + markFailWithRetry(task); + } else { + markSuccess(task); + } + } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); + if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) { markRedirect(task); } else { StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } - } else if (status < 500) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time. + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } else { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); markFailWithRetry(task); } } @@ -318,8 +326,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * Delivery failed by reason of an exception */ public void reportException(DeliveryTask task, Exception exception) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString()); - dqh.handleUnreachable(di); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString()); + deliveryQueueHelper.handleUnreachable(destinationInfo); markFailWithRetry(task); } @@ -330,14 +338,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * @return The feed ID */ public String getFeedId(String subid) { - return (dqh.getFeedId(subid)); + return (deliveryQueueHelper.getFeedId(subid)); } /** * Get the URL to deliver a message to given the file ID */ public String getDestURL(String fileid) { - return (dqh.getDestURL(di, fileid)); + return (deliveryQueueHelper.getDestURL(destinationInfo, fileid)); } /** @@ -346,8 +354,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public void run() { DeliveryTask t; - long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit(); - int filestogo = dqh.getFairFileLimit(); + long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit(); + int filestogo = deliveryQueueHelper.getFairFileLimit(); while ((t = getNext()) != null) { t.run(); if (--filestogo <= 0 || System.currentTimeMillis() > endtime) { @@ -369,4 +377,24 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { public void resetQueue() { resumetime = System.currentTimeMillis(); } + + /** + * Get task if in queue and mark as success + */ + public boolean markTaskSuccess(String pubId) { + DeliveryTask task = working.get(pubId); + if (task != null) { + markSuccess(task); + return true; + } + task = retry.get(pubId); + if (task != null) { + retry.remove(pubId); + task.clean(); + resumetime = 0; + failduration = 0; + return true; + } + return false; + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java index b1734cd4..5cf5fa4c 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java @@ -34,64 +34,69 @@ public interface DeliveryQueueHelper { /** * Get the timeout (milliseconds) before retrying after an initial delivery failure */ - public long getInitFailureTimer(); + long getInitFailureTimer(); + + /** + * Get the timeout before retrying after delivery and wait for file processing + */ + long getWaitForFileProcessFailureTimer(); /** * Get the ratio between timeouts on consecutive delivery attempts */ - public double getFailureBackoff(); + double getFailureBackoff(); /** * Get the maximum timeout (milliseconds) between delivery attempts */ - public long getMaxFailureTimer(); + long getMaxFailureTimer(); /** * Get the expiration timer (milliseconds) for deliveries */ - public long getExpirationTimer(); + long getExpirationTimer(); /** * Get the maximum number of file delivery attempts before checking * if another queue has work to be performed. */ - public int getFairFileLimit(); + int getFairFileLimit(); /** * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed. */ - public long getFairTimeLimit(); + long getFairTimeLimit(); /** * Get the URL for delivering a file * - * @param dest The destination information for the file to be delivered. + * @param destinationInfo The destination information for the file to be delivered. * @param fileid The file id for the file to be delivered. - * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid). + * @return The URL for delivering the file (typically, destinationInfo.getURL() + "/" + fileid). */ - public String getDestURL(DestInfo dest, String fileid); + String getDestURL(DestInfo destinationInfo, String fileid); /** * Forget redirections associated with a subscriber * - * @param dest Destination information to forget + * @param destinationInfo Destination information to forget */ - public void handleUnreachable(DestInfo dest); + void handleUnreachable(DestInfo destinationInfo); /** * Post redirection for a subscriber * - * @param dest Destination information to update + * @param destinationInfo Destination information to update * @param location Location given by subscriber * @param fileid File ID of request * @return true if this 3xx response is retryable, otherwise, false. */ - public boolean handleRedirection(DestInfo dest, String location, String fileid); + boolean handleRedirection(DestInfo destinationInfo, String location, String fileid); /** * Should I handle 3xx responses differently than 4xx responses? */ - public boolean isFollowRedirects(); + boolean isFollowRedirects(); /** * Get the feed ID for a subscription @@ -99,5 +104,5 @@ public interface DeliveryQueueHelper { * @param subid The subscription ID * @return The feed ID */ - public String getFeedId(String subid); + String getFeedId(String subid); } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index 80729905..b2c31691 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -47,9 +47,9 @@ public class DeliveryTask implements Runnable, Comparable { private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask"); private static EELFLogger eelflogger = EELFManager.getInstance() .getLogger(DeliveryTask.class); - private DeliveryTaskHelper dth; + private DeliveryTaskHelper deliveryTaskHelper; private String pubid; - private DestInfo di; + private DestInfo destInfo; private String spool; private File datafile; private File metafile; @@ -69,25 +69,25 @@ public class DeliveryTask implements Runnable, Comparable { /** * Create a delivery task for a given delivery queue and pub ID * - * @param dth The delivery task helper for the queue this task is in. + * @param deliveryTaskHelper The delivery task helper for the queue this task is in. * @param pubid The publish ID for this file. This is used as * the base for the file name in the spool directory and is of * the form . */ - public DeliveryTask(DeliveryTaskHelper dth, String pubid) { - this.dth = dth; + public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { + this.deliveryTaskHelper = deliveryTaskHelper; this.pubid = pubid; - di = dth.getDestInfo(); - subid = di.getSubId(); - feedid = di.getLogData(); - spool = di.getSpool(); + destInfo = deliveryTaskHelper.getDestinationInfo(); + subid = destInfo.getSubId(); + feedid = destInfo.getLogData(); + spool = destInfo.getSpool(); String dfn = spool + "/" + pubid; String mfn = dfn + ".M"; datafile = new File(spool + "/" + pubid); metafile = new File(mfn); - boolean monly = di.isMetaDataOnly(); + boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); - Vector hdrv = new Vector(); + Vector hdrv = new Vector<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { String s = br.readLine(); @@ -104,7 +104,7 @@ public class DeliveryTask implements Runnable, Comparable { String v = s.substring(i + 1); if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) { subid = v.replaceAll("[^ ]*/", ""); - feedid = dth.getFeedId(subid.replaceAll(" .*", "")); + feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", "")); } if (length == 0 && h.toLowerCase().startsWith("content-")) { continue; @@ -126,7 +126,7 @@ public class DeliveryTask implements Runnable, Comparable { loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); } hdrs = hdrv.toArray(new String[hdrv.size()][]); - url = dth.getDestURL(fileid); + url = deliveryTaskHelper.getDestURL(fileid); } /** * Is the object a DeliveryTask with the same publication ID? @@ -171,14 +171,14 @@ public class DeliveryTask implements Runnable, Comparable { public void run() { attempts++; try { - di = dth.getDestInfo(); - boolean expect100 = di.isUsing100(); - boolean monly = di.isMetaDataOnly(); + destInfo = deliveryTaskHelper.getDestinationInfo(); + boolean expect100 = destInfo.isUsing100(); + boolean monly = destInfo.isMetaDataOnly(); length = 0; if (!"DELETE".equals(method) && !monly) { length = datafile.length(); } - url = dth.getDestURL(fileid); + url = deliveryTaskHelper.getDestURL(fileid); URL u = new URL(url); HttpURLConnection uc = (HttpURLConnection) u.openConnection(); uc.setConnectTimeout(60000); @@ -186,7 +186,7 @@ public class DeliveryTask implements Runnable, Comparable { uc.setInstanceFollowRedirects(false); uc.setRequestMethod(method); uc.setRequestProperty("Content-Length", Long.toString(length)); - uc.setRequestProperty("Authorization", di.getAuth()); + uc.setRequestProperty("Authorization", destInfo.getAuth()); uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid); for (String[] nv : hdrs) { uc.addRequestProperty(nv[0], nv[1]); @@ -201,7 +201,7 @@ public class DeliveryTask implements Runnable, Comparable { try { os = uc.getOutputStream(); } catch (ProtocolException pe) { - dth.reportDeliveryExtra(this, -1L); + deliveryTaskHelper.reportDeliveryExtra(this, -1L); // Rcvd error instead of 100-continue loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe); } @@ -223,7 +223,7 @@ public class DeliveryTask implements Runnable, Comparable { } os.close(); } catch (IOException ioe) { - dth.reportDeliveryExtra(this, sofar); + deliveryTaskHelper.reportDeliveryExtra(this, sofar); throw ioe; } } @@ -257,10 +257,10 @@ public class DeliveryTask implements Runnable, Comparable { } is.close(); } - dth.reportStatus(this, rc, xpubid, rmsg); + deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg); } catch (Exception e) { loggerDeliveryTask.error("Exception "+e.getStackTrace(),e); - dth.reportException(this, e); + deliveryTaskHelper.reportException(this, e); } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java index 932b792a..d4ac8bd6 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java @@ -38,7 +38,7 @@ public interface DeliveryTaskHelper { * @param task The task that failed * @param exception The exception that occurred */ - public void reportException(DeliveryTask task, Exception exception); + void reportException(DeliveryTask task, Exception exception); /** * Report that a delivery attempt completed (successfully or unsuccessfully) @@ -48,7 +48,7 @@ public interface DeliveryTaskHelper { * @param xpubid The publish ID from the far end (if any) * @param location The redirection location for a 3XX response */ - public void reportStatus(DeliveryTask task, int status, String xpubid, String location); + void reportStatus(DeliveryTask task, int status, String xpubid, String location); /** * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue. @@ -56,14 +56,14 @@ public interface DeliveryTaskHelper { * @param task The task that failed * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue. */ - public void reportDeliveryExtra(DeliveryTask task, long sent); + void reportDeliveryExtra(DeliveryTask task, long sent); /** * Get the destination information for the delivery queue * * @return The destination information */ - public DestInfo getDestInfo(); + DestInfo getDestinationInfo(); /** * Given a file ID, get the URL to deliver to @@ -71,7 +71,7 @@ public interface DeliveryTaskHelper { * @param fileid The file id * @return The URL to deliver to */ - public String getDestURL(String fileid); + String getDestURL(String fileid); /** * Get the feed ID for a subscription @@ -79,5 +79,5 @@ public interface DeliveryTaskHelper { * @param subid The subscription ID * @return The feed iD */ - public String getFeedId(String subid); + String getFeedId(String subid); } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java index 12253314..c3e0057c 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java @@ -37,6 +37,7 @@ public class DestInfo { private String authentication; private boolean metaonly; private boolean use100; + private boolean privilegedSubscriber; /** * Create a destination information object. @@ -50,8 +51,9 @@ public class DestInfo { * @param authentication The credentials. * @param metaonly Is this a metadata only delivery? * @param use100 Should I use expect 100-continue? + * @param privilegedSubscriber Can we wait to receive a file processed acknowledgement before deleting file */ - public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) { + public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) { this.name = name; this.spool = spool; this.subid = subid; @@ -61,6 +63,27 @@ public class DestInfo { this.authentication = authentication; this.metaonly = metaonly; this.use100 = use100; + this.privilegedSubscriber = privilegedSubscriber; + } + + /** + * Create a destination information object. + * + * @param name n:fqdn or s:subid + * @param spool The directory where files are spooled. + * @param subscription The subscription. + */ + public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) { + this.name = name; + this.spool = spool; + this.subid = subscription.getSubId(); + this.logdata = subscription.getFeedId(); + this.url = subscription.getURL(); + this.authuser = subscription.getAuthUser(); + this.authentication = subscription.getCredentials(); + this.metaonly = subscription.isMetaDataOnly(); + this.use100 = subscription.isUsing100(); + this.privilegedSubscriber = subscription.isPrivilegedSubscriber(); } public boolean equals(Object o) { @@ -150,4 +173,11 @@ public class DestInfo { public boolean isUsing100() { return (use100); } + + /** + * Should we wait to receive a file processed acknowledgement before deleting file + */ + public boolean isPrivilegedSubscriber() { + return (privilegedSubscriber); + } } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java index 1d5f76f6..ff803afc 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java @@ -57,6 +57,10 @@ public class LogManager extends TimerTask { return (10000L); } + public long getWaitForFileProcessFailureTimer() { + return (600000L); + } + public double getFailureBackoff() { return (2.0); } @@ -77,14 +81,14 @@ public class LogManager extends TimerTask { return (86400000); } - public String getDestURL(DestInfo dest, String fileid) { + public String getDestURL(DestInfo destinationInfo, String fileid) { return (config.getEventLogUrl()); } - public void handleUnreachable(DestInfo dest) { + public void handleUnreachable(DestInfo destinationInfo) { } - public boolean handleRedirection(DestInfo dest, String location, String fileid) { + public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) { return (false); } @@ -101,7 +105,7 @@ public class LogManager extends TimerTask { public Uploader() { dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, - false)); + false, false)); setDaemon(true); setName("Log Uploader"); start(); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java index c40d29c3..d3d3d01b 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java @@ -231,6 +231,7 @@ public class NodeConfig { private String credentials; private boolean metaonly; private boolean use100; + private boolean privilegedSubscriber; /** * Construct a subscription configuration entry @@ -243,9 +244,10 @@ public class NodeConfig { * Authorization header. * @param metaonly Is this a meta data only subscription? * @param use100 Should we send Expect: 100-continue? + * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file */ public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, - boolean metaonly, boolean use100) { + boolean metaonly, boolean use100, boolean privilegedSubscriber) { this.subid = subid; this.feedid = feedid; this.url = url; @@ -253,6 +255,7 @@ public class NodeConfig { this.credentials = credentials; this.metaonly = metaonly; this.use100 = use100; + this.privilegedSubscriber = privilegedSubscriber; } /** @@ -303,6 +306,13 @@ public class NodeConfig { public boolean isUsing100() { return (use100); } + + /** + * Can we wait to receive a delete file call before deleting file + */ + public boolean isPrivilegedSubscriber() { + return (privilegedSubscriber); + } } /** @@ -462,11 +472,12 @@ public class NodeConfig { Target[] targets; } - private Hashtable params = new Hashtable(); - private Hashtable feeds = new Hashtable(); - private Hashtable nodeinfo = new Hashtable(); - private Hashtable subinfo = new Hashtable(); - private Hashtable nodes = new Hashtable(); + private Hashtable params = new Hashtable<>(); + private Hashtable feeds = new Hashtable<>(); + private Hashtable nodeinfo = new Hashtable<>(); + private Hashtable subinfo = new Hashtable<>(); + private Hashtable nodes = new Hashtable<>(); + private Hashtable provSubscriptions = new Hashtable<>(); private String myname; private String myauth; private DestInfo[] alldests; @@ -486,7 +497,7 @@ public class NodeConfig { for (ProvParam p : pd.getParams()) { params.put(p.getName(), p.getValue()); } - Vector div = new Vector(); + Vector destInfos = new Vector<>(); myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey); for (ProvNode pn : pd.getNodes()) { String cn = pn.getCName(); @@ -495,9 +506,9 @@ public class NodeConfig { } String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey); DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, - "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true); + "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false); (new File(di.getSpool())).mkdirs(); - div.add(di); + destInfos.add(di); nodeinfo.put(cn, di); nodes.put(auth, new IsFrom(cn)); } @@ -533,7 +544,7 @@ public class NodeConfig { } egrtab.put(pfe.getSubId(), pfe.getNode()); } - Hashtable> pfstab = new Hashtable>(); + Hashtable> pfstab = new Hashtable<>(); for (ProvFeedSubnet pfs : pd.getFeedSubnets()) { Vector v = pfstab.get(pfs.getFeedId()); if (v == null) { @@ -542,46 +553,47 @@ public class NodeConfig { } v.add(new SubnetMatcher(pfs.getCidr())); } - Hashtable ttab = new Hashtable(); - HashSet allfeeds = new HashSet(); + Hashtable feedTargets = new Hashtable<>(); + HashSet allfeeds = new HashSet<>(); for (ProvFeed pfx : pd.getFeeds()) { if (pfx.getStatus() == null) { allfeeds.add(pfx.getId()); } } - for (ProvSubscription ps : pd.getSubscriptions()) { - String sid = ps.getSubId(); - String fid = ps.getFeedId(); - if (!allfeeds.contains(fid)) { + for (ProvSubscription provSubscription : pd.getSubscriptions()) { + String subId = provSubscription.getSubId(); + String feedId = provSubscription.getFeedId(); + if (!allfeeds.contains(feedId)) { continue; } - if (subinfo.get(sid) != null) { + if (subinfo.get(subId) != null) { continue; } int sididx = 999; try { - sididx = Integer.parseInt(sid); + sididx = Integer.parseInt(subId); sididx -= sididx % 100; } catch (Exception e) { } - String siddir = sididx + "/" + sid; - DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(), - ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100()); - (new File(di.getSpool())).mkdirs(); - div.add(di); - subinfo.put(sid, di); - String egr = egrtab.get(sid); + String subscriptionDirectory = sididx + "/" + subId; + DestInfo destinationInfo = new DestInfo("s:" + subId, + spooldir + "/s/" + subscriptionDirectory, provSubscription); + (new File(destinationInfo.getSpool())).mkdirs(); + destInfos.add(destinationInfo); + provSubscriptions.put(subId, provSubscription); + subinfo.put(subId, destinationInfo); + String egr = egrtab.get(subId); if (egr != null) { - sid = pf.getPath(egr) + sid; + subId = pf.getPath(egr) + subId; } - StringBuffer sb = ttab.get(fid); + StringBuffer sb = feedTargets.get(feedId); if (sb == null) { sb = new StringBuffer(); - ttab.put(fid, sb); + feedTargets.put(feedId, sb); } - sb.append(' ').append(sid); + sb.append(' ').append(subId); } - alldests = div.toArray(new DestInfo[div.size()]); + alldests = destInfos.toArray(new DestInfo[destInfos.size()]); for (ProvFeed pfx : pd.getFeeds()) { String fid = pfx.getId(); Feed f = feeds.get(fid); @@ -609,7 +621,7 @@ public class NodeConfig { } else { f.redirections = v2.toArray(new Redirection[v2.size()]); } - StringBuffer sb = ttab.get(fid); + StringBuffer sb = feedTargets.get(fid); if (sb == null) { f.targets = new Target[0]; } else { @@ -711,6 +723,16 @@ public class NodeConfig { return ("Publisher not permitted for this feed"); } + /** + * Check whether delete file is allowed. + * + * @param subId The ID of the subscription being requested. + */ + public boolean isDeletePermitted(String subId) { + ProvSubscription provSubscription = provSubscriptions.get(subId); + return provSubscription.isPrivilegedSubscriber(); + } + /** * Get authenticated user */ diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java index 474f5dde..d98c47ae 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java @@ -57,6 +57,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { private Timer timer = new Timer("Node Configuration Timer", true); private long maxfailuretimer; private long initfailuretimer; + private long waitForFileProcessFailureTimer; private long expirationtimer; private double failurebackoff; private long fairtimelimit; @@ -187,6 +188,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false")); eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s"); initfailuretimer = 10000; + waitForFileProcessFailureTimer = 600000; maxfailuretimer = 3600000; expirationtimer = 86400000; failurebackoff = 2.0; @@ -199,6 +201,10 @@ public class NodeConfigManager implements DeliveryQueueHelper { initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) { } + try { + waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000); + } catch (Exception e) { + } try { maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) { @@ -328,6 +334,16 @@ public class NodeConfigManager implements DeliveryQueueHelper { return (config.isPublishPermitted(feedid, credentials, ip)); } + /** + * Check whether delete file is allowed. + * + * @param subId The ID of the subscription being requested + * @return True if the delete file is permitted for the subscriber. + */ + public boolean isDeletePermitted(String subId) { + return (config.isDeletePermitted(subId)); + } + /** * Check who the user is given the feed ID and the offered credentials. * @@ -407,13 +423,13 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Get the URL to deliver a message to. * - * @param destinfo The destination information + * @param destinationInfo The destination information * @param fileid The file ID * @return The URL to deliver to */ - public String getDestURL(DestInfo destinfo, String fileid) { - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); + public String getDestURL(DestInfo destinationInfo, String fileid) { + String subid = destinationInfo.getSubId(); + String purl = destinationInfo.getURL(); if (followredirects && subid != null) { purl = rdmgr.lookup(subid, purl); } @@ -430,10 +446,10 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Set up redirection on receipt of a 3XX from a target URL */ - public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) { + public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) { fileid = "/" + fileid; - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); + String subid = destinationInfo.getSubId(); + String purl = destinationInfo.getURL(); if (followredirects && subid != null && redirto.endsWith(fileid)) { redirto = redirto.substring(0, redirto.length() - fileid.length()); if (!redirto.equals(purl)) { @@ -447,8 +463,8 @@ public class NodeConfigManager implements DeliveryQueueHelper { /** * Handle unreachable target URL */ - public void handleUnreachable(DestInfo destinfo) { - String subid = destinfo.getSubId(); + public void handleUnreachable(DestInfo destinationInfo) { + String subid = destinationInfo.getSubId(); if (followredirects && subid != null) { rdmgr.forget(subid); } @@ -461,6 +477,13 @@ public class NodeConfigManager implements DeliveryQueueHelper { return (initfailuretimer); } + /** + * Get the timeout before retrying after delivery and wait for file processing + */ + public long getWaitForFileProcessFailureTimer() { + return (waitForFileProcessFailureTimer); + } + /** * Get the maximum timeout between delivery attempts */ diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java index e07642c4..d25531a7 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java @@ -145,7 +145,7 @@ public class NodeMain { ctxt = new ServletContextHandler(0); ctxt.setContextPath("/"); server.setHandler(ctxt); - ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*"); + ctxt.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*"); nodeMainLogger.info("NODE0005 Data Router Node Activating Service"); server.start(); server.join(); diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java index 7e1d46d3..fae2c1f6 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java @@ -40,11 +40,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Enumeration; import java.util.regex.Pattern; -import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; +import org.jetbrains.annotations.Nullable; import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; import org.slf4j.MDC; @@ -67,6 +67,7 @@ public class NodeServlet extends HttpServlet { //Adding EELF Logger Rally:US664892 private static EELFLogger eelflogger = EELFManager.getInstance() .getLogger(NodeServlet.class); + private Delivery delivery; static { final String ws = "\\s*"; @@ -80,6 +81,10 @@ public class NodeServlet extends HttpServlet { MetaDataPattern = Pattern.compile(object, Pattern.DOTALL); } + NodeServlet(Delivery delivery) { + this.delivery = delivery; + } + /** * Get the NodeConfigurationManager */ @@ -155,16 +160,13 @@ public class NodeServlet extends HttpServlet { } catch (IOException ioe) { logger.error("IOException" + ioe.getMessage()); eelflogger.info(EelfMsgs.EXIT); - } catch (ServletException se) { - logger.error("ServletException" + se.getMessage()); - eelflogger.info(EelfMsgs.EXIT); } } /** * Handle all DELETE requests */ - protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) { NodeUtils.setIpAndFqdnForEelf("doDelete"); NodeUtils.setRequestIdAndInvocationId(req); eelflogger.info(EelfMsgs.ENTRY); @@ -173,40 +175,26 @@ public class NodeServlet extends HttpServlet { try { common(req, resp, false); } catch (IOException ioe) { - logger.error("IOException" + ioe.getMessage()); - eelflogger.info(EelfMsgs.EXIT); - } catch (ServletException se) { - logger.error("ServletException" + se.getMessage()); + logger.error("IOException " + ioe.getMessage()); eelflogger.info(EelfMsgs.EXIT); } - } - private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) - throws ServletException, IOException { - if (down(resp)) { - eelflogger.info(EelfMsgs.EXIT); - return; - } - if (!req.isSecure()) { - logger.info( - "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req - .getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); - eelflogger.info(EelfMsgs.EXIT); - return; - } - String fileid = req.getPathInfo(); - if (fileid == null) { - logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req - .getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_NOT_FOUND, - "Invalid request URI. Expecting /."); - eelflogger.info(EelfMsgs.EXIT); - return; - } + private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException { + String fileid = getFileId(req, resp); + if (fileid == null) return; String feedid = null; String user = null; + String ip = req.getRemoteAddr(); + String lip = req.getLocalAddr(); + String pubid = null; + String xpubid = null; + String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; + Target[] targets = null; + if (fileid.startsWith("/delete/")) { + deleteFile(req, resp, fileid, pubid); + return; + } String credentials = req.getHeader("Authorization"); if (credentials == null) { logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req @@ -215,12 +203,6 @@ public class NodeServlet extends HttpServlet { eelflogger.info(EelfMsgs.EXIT); return; } - String ip = req.getRemoteAddr(); - String lip = req.getLocalAddr(); - String pubid = null; - String xpubid = null; - String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; - Target[] targets = null; if (fileid.startsWith("/publish/")) { fileid = fileid.substring(9); int i = fileid.indexOf('/'); @@ -315,8 +297,8 @@ public class NodeServlet extends HttpServlet { mx.append(req.getMethod()).append('\t').append(fileid).append('\n'); Enumeration hnames = req.getHeaderNames(); String ctype = null; - Boolean hasRequestIdHeader = false; - Boolean hasInvocationIdHeader = false; + boolean hasRequestIdHeader = false; + boolean hasInvocationIdHeader = false; while (hnames.hasMoreElements()) { String hn = (String) hnames.nextElement(); String hnlc = hn.toLowerCase(); @@ -449,6 +431,90 @@ public class NodeServlet extends HttpServlet { } } + private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) { + try { + fileid = fileid.substring(8); + int i = fileid.indexOf('/'); + if (i == -1 || i == fileid.length() - 1) { + logger.info("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req + .getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, + "Invalid request URI. Expecting /."); + eelflogger.info(EelfMsgs.EXIT); + return; + } + String subscriptionId = fileid.substring(0, i); + int subId = Integer.parseInt(subscriptionId); + pubid = fileid.substring(i + 1); + String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: " + + config.getMyName() + "."; + int subIdDir = subId - (subId % 100); + if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) { + return; + } + boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid); + if (result) { + logger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: " + + config.getMyName()); + resp.setStatus(HttpServletResponse.SC_OK); + eelflogger.info(EelfMsgs.EXIT); + } else { + logger.error("NODE0116 " + errorMessage); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "File not found on server."); + eelflogger.info(EelfMsgs.EXIT); + } + } catch (IOException ioe) { + logger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: " + + config.getMyName() + ". Error: " + ioe.getMessage()); + eelflogger.info(EelfMsgs.EXIT); + } + } + + @Nullable + private String getFileId(HttpServletRequest req, HttpServletResponse resp) throws IOException { + if (down(resp)) { + eelflogger.info(EelfMsgs.EXIT); + return null; + } + if (!req.isSecure()) { + logger.info( + "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req + .getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); + eelflogger.info(EelfMsgs.EXIT); + return null; + } + String fileid = req.getPathInfo(); + if (fileid == null) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req + .getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, + "Invalid request URI. Expecting /."); + eelflogger.info(EelfMsgs.EXIT); + return null; + } + return fileid; + } + + private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException { + try { + boolean deletePermitted = config.isDeletePermitted(subscriptionId); + if (!deletePermitted) { + logger.error("NODE0113 " + errorMessage + " Error: Subscription " + + subscriptionId + " is not a privileged subscription"); + resp.sendError(HttpServletResponse.SC_UNAUTHORIZED); + eelflogger.info(EelfMsgs.EXIT); + return false; + } + } catch (NullPointerException npe) { + logger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId + " does not exist"); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + eelflogger.info(EelfMsgs.EXIT); + return false; + } + return true; + } + private int getIdFromPath(HttpServletRequest req) { String path = req.getPathInfo(); if (path == null || path.length() < 2) { diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java index f0b81747..765a4075 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java @@ -173,7 +173,8 @@ public class ProvData { String password = gvas(jdel, "password"); boolean monly = jsub.getBoolean("metadataOnly"); boolean use100 = jdel.getBoolean("use100"); - psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100)); + boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber"); + psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber)); } } JSONObject jparams = jcfg.optJSONObject("parameters"); diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java index 8a7460e8..ae8cd2cd 100644 --- a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java +++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java @@ -97,7 +97,7 @@ public class DeliveryTest { private DestInfo[] createDestInfoObjects() { DestInfo[] destInfos = new DestInfo[1]; - DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true); + DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true, false); destInfos[0] = destInfo; return destInfos; } diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java index 18108172..5092141a 100644 --- a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java +++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java @@ -210,6 +210,7 @@ public class NodeConfigTest { delivery.put("password", "password1"); delivery.put("use100", true); subscription.put("delivery", delivery); + subscription.put("privilegedSubscriber", false); subscriptions.put(subscription); provData.put("subscriptions", subscriptions); } diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java index 2a659aaf..065565d3 100644 --- a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java +++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java @@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.io.File; +import java.io.IOException; import java.util.*; import static org.hamcrest.Matchers.notNullValue; @@ -49,6 +51,7 @@ import static org.mockito.Mockito.*; public class NodeServletTest { private NodeServlet nodeServlet; + private Delivery delivery; @Mock private HttpServletRequest request; @@ -59,18 +62,26 @@ public class NodeServletTest { ListAppender listAppender; @Before - public void setUp() throws Exception{ + public void setUp() throws Exception { listAppender = setTestLogger(); - nodeServlet = new NodeServlet(); setBehalfHeader("Stub_Value"); when(request.getPathInfo()).thenReturn("2"); when(request.isSecure()).thenReturn(true); + createFilesAndDirectories(); setUpConfig(); setUpNodeMainDelivery(); + delivery = mock(Delivery.class); + when(delivery.markTaskSuccess("spool/s/0/1", "dmaap-dr-node.1234567")).thenReturn(true); + nodeServlet = new NodeServlet(delivery); when(request.getHeader("Authorization")).thenReturn("User1"); when(request.getHeader("X-DMAAP-DR-PUBLISH-ID")).thenReturn("User1"); } + @AfterClass + public static void tearDown() { + deleteCreatedDirectories(); + } + @Test public void Given_Request_Is_HTTP_GET_And_Config_Is_Down_Then_Service_Unavailable_Response_Is_Generated() throws Exception { setNodeConfigManagerIsConfiguredToReturnFalse(); @@ -80,7 +91,7 @@ public class NodeServletTest { } @Test - public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_Internal_FetchProv_Then_No_Content_Response_Is_Generated() throws Exception { + public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_Internal_FetchProv_Then_No_Content_Response_Is_Generated() { when(request.getPathInfo()).thenReturn("/internal/fetchProv"); nodeServlet.doGet(request, response); verify(response).setStatus(eq(HttpServletResponse.SC_NO_CONTENT)); @@ -88,7 +99,7 @@ public class NodeServletTest { } @Test - public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_ResetSubscription_Then_No_Content_Response_Is_Generated() throws Exception { + public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_ResetSubscription_Then_No_Content_Response_Is_Generated() { when(request.getPathInfo()).thenReturn("/internal/resetSubscription/1"); nodeServlet.doGet(request, response); verify(response).setStatus(eq(HttpServletResponse.SC_NO_CONTENT)); @@ -213,6 +224,48 @@ public class NodeServletTest { verifyEnteringExitCalled(listAppender); } + @Test + public void Given_Request_Is_HTTP_DELETE_File_With_Invalid_Endpoint_Then_Not_Found_Response_Is_Generated() throws Exception { + when(request.getPathInfo()).thenReturn("/delete/1"); + nodeServlet.doDelete(request, response); + verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND), argThat(notNullValue(String.class))); + verifyEnteringExitCalled(listAppender); + } + + @Test + public void Given_Request_Is_HTTP_DELETE_File_And_Is_Not_Privileged_Subscription_Then_Not_Found_Response_Is_Generated() throws Exception { + when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567"); + setUpConfigToReturnUnprivilegedSubscriber(); + nodeServlet.doDelete(request, response); + verify(response).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED)); + verifyEnteringExitCalled(listAppender); + } + + @Test + public void Given_Request_Is_HTTP_DELETE_File_And_Subscription_Does_Not_Exist_Then_Not_Found_Response_Is_Generated() throws Exception { + when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567"); + setUpConfigToReturnNullOnIsDeletePermitted(); + nodeServlet.doDelete(request, response); + verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND)); + verifyEnteringExitCalled(listAppender); + } + + @Test + public void Given_Request_Is_HTTP_DELETE_File_Then_Request_Succeeds() throws Exception { + when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567"); + createFilesAndDirectories(); + nodeServlet.doDelete(request, response); + verify(response).setStatus(eq(HttpServletResponse.SC_OK)); + verifyEnteringExitCalled(listAppender); + } + + @Test + public void Given_Request_Is_HTTP_DELETE_File_And_File_Does_Not_Exist_Then_Not_Found_Response_Is_Generated() throws IOException { + when(request.getPathInfo()).thenReturn("/delete/1/nonExistingFile"); + nodeServlet.doDelete(request, response); + verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND), argThat(notNullValue(String.class))); + verifyEnteringExitCalled(listAppender); + } private void setBehalfHeader(String headerValue) { when(request.getHeader("X-DMAAP-DR-ON-BEHALF-OF")).thenReturn(headerValue); @@ -232,21 +285,45 @@ public class NodeServletTest { assertEquals(3, listAppender.list.size()); } - private void setUpConfig() throws IllegalAccessException{ + private void setUpConfig() throws IllegalAccessException { NodeConfigManager config = mock(NodeConfigManager.class); PowerMockito.mockStatic(NodeConfigManager.class); when(config.isShutdown()).thenReturn(false); when(config.isConfigured()).thenReturn(true); - when(config.getSpoolDir()).thenReturn("spool/dir"); + when(config.getSpoolDir()).thenReturn("spool/f"); + when(config.getSpoolBase()).thenReturn("spool"); when(config.getLogDir()).thenReturn("log/dir"); when(config.getPublishId()).thenReturn("User1"); when(config.isAnotherNode(anyString(), anyString())).thenReturn(true); when(config.getEventLogInterval()).thenReturn("40"); + when(config.isDeletePermitted("1")).thenReturn(true); + when(config.getAllDests()).thenReturn(new DestInfo[0]); FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true); FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true); PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config); } + private void setUpConfigToReturnUnprivilegedSubscriber() throws IllegalAccessException { + NodeConfigManager config = mock(NodeConfigManager.class); + PowerMockito.mockStatic(NodeConfigManager.class); + when(config.isShutdown()).thenReturn(false); + when(config.isConfigured()).thenReturn(true); + when(config.isDeletePermitted("1")).thenReturn(false); + FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true); + FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true); + PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config); + } + + private void setUpConfigToReturnNullOnIsDeletePermitted() throws IllegalAccessException { + NodeConfigManager config = mock(NodeConfigManager.class); + PowerMockito.mockStatic(NodeConfigManager.class); + when(config.isShutdown()).thenReturn(false); + when(config.isConfigured()).thenReturn(true); + when(config.isDeletePermitted("1")).thenThrow(new NullPointerException()); + FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true); + FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true); + PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config); + } private void setUpNodeMainDelivery() throws IllegalAccessException{ Delivery delivery = mock(Delivery.class); @@ -313,4 +390,31 @@ public class NodeServletTest { when(request.getHeaders("X-DMAAP-DR-ON-BEHALF-OF")).thenReturn(behalfHeader); when(request.getHeaders("X-DMAAP-DR-META")).thenReturn(metaDataHeader); } + + private void createFilesAndDirectories() throws IOException { + File nodeDir = new File("spool/n/172.0.0.1"); + File spoolDir = new File("spool/s/0/1"); + File dataFile = new File("spool/s/0/1/dmaap-dr-node.1234567"); + File metaDataFile = new File("spool/s/0/1/dmaap-dr-node.1234567.M"); + nodeDir.mkdirs(); + spoolDir.mkdirs(); + dataFile.createNewFile(); + metaDataFile.createNewFile(); + } + + private static void deleteCreatedDirectories() { + File spoolDir = new File("spool"); + delete(spoolDir); + } + + private static void delete(File file) { + if (file.isDirectory()) { + for (File f: file.listFiles()) { + delete(f); + } + } + if (!file.delete()) { + System.out.println("Failed to delete: " + file); + } + } } diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Subscription.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Subscription.java index e1b24c53..230df756 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Subscription.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Subscription.java @@ -68,6 +68,7 @@ public class Subscription extends Syncable { private boolean suspended; private Date lastMod; private Date createdDate; + private boolean privilegedSubscriber; public static Subscription getSubscriptionMatching(Subscription sub) { SubDelivery deli = sub.getDelivery(); @@ -200,6 +201,7 @@ public class Subscription extends Syncable { this.suspended = false; this.lastMod = new Date(); this.createdDate = new Date(); + this.privilegedSubscriber = false; } public Subscription(ResultSet rs) throws SQLException { @@ -214,6 +216,7 @@ public class Subscription extends Syncable { this.suspended = rs.getBoolean("SUSPENDED"); this.lastMod = rs.getDate("LAST_MOD"); this.createdDate = rs.getDate("CREATED_DATE"); + this.privilegedSubscriber = rs.getBoolean("PRIVILEGED_SUBSCRIBER"); } public Subscription(JSONObject jo) throws InvalidObjectException { @@ -249,7 +252,7 @@ public class Subscription extends Syncable { this.metadataOnly = jo.getBoolean("metadataOnly"); this.suspended = jo.optBoolean("suspend", false); - + this.privilegedSubscriber = jo.optBoolean("privilegedSubscriber", false); this.subscriber = jo.optString("subscriber", ""); JSONObject jol = jo.optJSONObject("links"); this.links = (jol == null) ? (new SubLinks()) : (new SubLinks(jol)); @@ -323,6 +326,14 @@ public class Subscription extends Syncable { this.suspended = suspended; } + public boolean isPrivilegedSubscriber() { + return privilegedSubscriber; + } + + public void setPrivilegedSubscriber(boolean privilegedSubscriber) { + this.privilegedSubscriber = privilegedSubscriber; + } + public String getSubscriber() { return subscriber; } @@ -357,6 +368,7 @@ public class Subscription extends Syncable { jo.put("suspend", suspended); jo.put(LAST_MOD_KEY, lastMod.getTime()); jo.put(CREATED_DATE, createdDate.getTime()); + jo.put("privilegedSubscriber", privilegedSubscriber); return jo; } @@ -394,7 +406,7 @@ public class Subscription extends Syncable { } // Create the SUBSCRIPTIONS row - String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; ps = c.prepareStatement(sql, new String[]{SUBID_COL}); ps.setInt(1, subid); ps.setInt(2, feedid); @@ -406,6 +418,7 @@ public class Subscription extends Syncable { ps.setString(8, getSubscriber()); ps.setBoolean(9, isSuspended()); ps.setInt(10, groupid); //New field is added - Groups feature Rally:US708115 - 1610 + ps.setBoolean(11, isPrivilegedSubscriber()); ps.execute(); ps.close(); // Update the row to set the URLs @@ -436,7 +449,7 @@ public class Subscription extends Syncable { boolean rv = true; PreparedStatement ps = null; try { - String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ? where SUBID = ?"; + String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ?, PRIVILEGED_SUBSCRIBER = ? where SUBID = ?"; ps = c.prepareStatement(sql); ps.setString(1, delivery.getUrl()); ps.setString(2, delivery.getUser()); @@ -445,7 +458,8 @@ public class Subscription extends Syncable { ps.setInt(5, isMetadataOnly() ? 1 : 0); ps.setInt(6, suspended ? 1 : 0); ps.setInt(7, groupid); //New field is added - Groups feature Rally:US708115 - 1610 - ps.setInt(8, subid); + ps.setInt(8, privilegedSubscriber ? 1 : 0); + ps.setInt(9, subid); ps.executeUpdate(); } catch (SQLException e) { rv = false; diff --git a/datarouter-prov/src/main/resources/misc/sql_init_01.sql b/datarouter-prov/src/main/resources/misc/sql_init_01.sql index 01258320..60b638a0 100755 --- a/datarouter-prov/src/main/resources/misc/sql_init_01.sql +++ b/datarouter-prov/src/main/resources/misc/sql_init_01.sql @@ -31,20 +31,21 @@ CREATE TABLE FEED_ENDPOINT_ADDRS ( ); CREATE TABLE SUBSCRIPTIONS ( - SUBID INT UNSIGNED NOT NULL PRIMARY KEY, - FEEDID INT UNSIGNED NOT NULL, - GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0, - DELIVERY_URL VARCHAR(256), - DELIVERY_USER VARCHAR(20), - DELIVERY_PASSWORD VARCHAR(32), - DELIVERY_USE100 BOOLEAN DEFAULT FALSE, - METADATA_ONLY BOOLEAN DEFAULT FALSE, - SUBSCRIBER VARCHAR(8) NOT NULL, - SELF_LINK VARCHAR(256), - LOG_LINK VARCHAR(256), - LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - SUSPENDED BOOLEAN DEFAULT FALSE, - CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP + SUBID INT UNSIGNED NOT NULL PRIMARY KEY, + FEEDID INT UNSIGNED NOT NULL, + GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0, + DELIVERY_URL VARCHAR(256), + DELIVERY_USER VARCHAR(20), + DELIVERY_PASSWORD VARCHAR(32), + DELIVERY_USE100 BOOLEAN DEFAULT FALSE, + METADATA_ONLY BOOLEAN DEFAULT FALSE, + SUBSCRIBER VARCHAR(8) NOT NULL, + SELF_LINK VARCHAR(256), + LOG_LINK VARCHAR(256), + LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + SUSPENDED BOOLEAN DEFAULT FALSE, + PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE, + CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); @@ -134,6 +135,7 @@ INSERT INTO PARAMETERS VALUES ('DELIVERY_INIT_RETRY_INTERVAL', '10'), ('DELIVERY_MAX_AGE', '86400'), ('DELIVERY_MAX_RETRY_INTERVAL', '3600'), + ('DELIVERY_FILE_PROCESS_INTERVAL', '600'), ('DELIVERY_RETRY_RATIO', '2'), ('LOGROLL_INTERVAL', '30'), ('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'), @@ -142,5 +144,5 @@ INSERT INTO PARAMETERS VALUES ('PROV_MAXSUB_COUNT', '100000'), ('PROV_REQUIRE_CERT', 'false'), ('PROV_REQUIRE_SECURE', 'false'), - ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE') + ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL') ; diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SubscriptionServletTest.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SubscriptionServletTest.java index fd1e68ef..43973d56 100755 --- a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SubscriptionServletTest.java +++ b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SubscriptionServletTest.java @@ -22,7 +22,6 @@ ******************************************************************************/ package org.onap.dmaap.datarouter.provisioning; -import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import org.apache.commons.lang3.reflect.FieldUtils; @@ -39,7 +38,6 @@ import org.onap.dmaap.datarouter.provisioning.beans.Subscription; import org.onap.dmaap.datarouter.provisioning.beans.Updateable; import org.onap.dmaap.datarouter.provisioning.utils.DB; import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.LoggerFactory; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; @@ -48,15 +46,12 @@ import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import java.io.File; import java.io.FileNotFoundException; -import java.io.PrintWriter; import java.sql.SQLException; import java.util.HashSet; import java.util.Set; import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; import static org.onap.dmaap.datarouter.provisioning.BaseServlet.BEHALF_HEADER; @@ -283,6 +278,7 @@ public class SubscriptionServletTest extends DrServletTestBase { jo.put("version", "2.0"); jo.put("metadataOnly", true); jo.put("suspend", true); + jo.put("privilegedSubscriber", true); jo.put("delivery", JSObject); jo.put("subscriber", "differentSubscriber"); jo.put("sync", true); @@ -305,6 +301,7 @@ public class SubscriptionServletTest extends DrServletTestBase { jo.put("version", "2.0"); jo.put("metadataOnly", true); jo.put("suspend", true); + jo.put("privilegedSubscriber", true); jo.put("delivery", JSObject); jo.put("sync", true); return jo; @@ -333,6 +330,7 @@ public class SubscriptionServletTest extends DrServletTestBase { jo.put("version", "2.0"); jo.put("metadataOnly", true); jo.put("suspend", true); + jo.put("privilegedSubscriber", true); jo.put("delivery", JSObject); jo.put("sync", true); jo.put("changeowner", true); @@ -505,6 +503,7 @@ public class SubscriptionServletTest extends DrServletTestBase { subscription.setGroupid(1); subscription.setMetadataOnly(false); subscription.setSuspended(false); + subscription.setPrivilegedSubscriber(false); subscription.doInsert(db.getConnection()); } @@ -518,6 +517,7 @@ public class SubscriptionServletTest extends DrServletTestBase { subscription.setGroupid(1); subscription.setMetadataOnly(false); subscription.setSuspended(false); + subscription.setPrivilegedSubscriber(false); subscription.changeOwnerShip(); subscription.doUpdate(db.getConnection()); } diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/beans/SubscriptionTest.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/beans/SubscriptionTest.java index 1e0c042b..3e6aed07 100644 --- a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/beans/SubscriptionTest.java +++ b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/beans/SubscriptionTest.java @@ -62,6 +62,7 @@ public class SubscriptionTest { subscription.setMetadataOnly(false); subscription.setSubscriber(subscriber); subscription.setSuspended(false); + subscription.setPrivilegedSubscriber(false); subscription.setLinks(subLinks); Assert.assertEquals(2, subscription.getGroupid()); @@ -69,5 +70,6 @@ public class SubscriptionTest { Assert.assertEquals(subLinks, subscription.getLinks()); Assert.assertFalse(subscription.isMetadataOnly()); Assert.assertFalse(subscription.isSuspended()); + Assert.assertFalse(subscription.isPrivilegedSubscriber()); } } \ No newline at end of file diff --git a/datarouter-prov/src/test/resources/create.sql b/datarouter-prov/src/test/resources/create.sql index b704602f..fa7402ae 100755 --- a/datarouter-prov/src/test/resources/create.sql +++ b/datarouter-prov/src/test/resources/create.sql @@ -29,20 +29,21 @@ CREATE TABLE FEED_ENDPOINT_ADDRS ( ); CREATE TABLE SUBSCRIPTIONS ( - SUBID INT UNSIGNED NOT NULL PRIMARY KEY, - FEEDID INT UNSIGNED NOT NULL, - GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0, - DELIVERY_URL VARCHAR(256), - DELIVERY_USER VARCHAR(20), - DELIVERY_PASSWORD VARCHAR(32), - DELIVERY_USE100 BOOLEAN DEFAULT FALSE, - METADATA_ONLY BOOLEAN DEFAULT FALSE, - SUBSCRIBER VARCHAR(8) NOT NULL, - SELF_LINK VARCHAR(256), - LOG_LINK VARCHAR(256), - LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - SUSPENDED BOOLEAN DEFAULT FALSE, - CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP + SUBID INT UNSIGNED NOT NULL PRIMARY KEY, + FEEDID INT UNSIGNED NOT NULL, + GROUPID INT(10) UNSIGNED NOT NULL DEFAULT 0, + DELIVERY_URL VARCHAR(256), + DELIVERY_USER VARCHAR(20), + DELIVERY_PASSWORD VARCHAR(32), + DELIVERY_USE100 BOOLEAN DEFAULT FALSE, + METADATA_ONLY BOOLEAN DEFAULT FALSE, + SUBSCRIBER VARCHAR(8) NOT NULL, + SELF_LINK VARCHAR(256), + LOG_LINK VARCHAR(256), + LAST_MOD TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + SUSPENDED BOOLEAN DEFAULT FALSE, + PRIVILEGED_SUBSCRIBER BOOLEAN DEFAULT FALSE, + CREATED_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); @@ -132,6 +133,7 @@ INSERT INTO PARAMETERS VALUES ('DELIVERY_INIT_RETRY_INTERVAL', '10'), ('DELIVERY_MAX_AGE', '86400'), ('DELIVERY_MAX_RETRY_INTERVAL', '3600'), + ('DELIVERY_FILE_PROCESS_INTERVAL', '600'), ('DELIVERY_RETRY_RATIO', '2'), ('LOGROLL_INTERVAL', '300'), ('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'), @@ -140,14 +142,14 @@ INSERT INTO PARAMETERS VALUES ('PROV_MAXSUB_COUNT', '100000'), ('PROV_REQUIRE_CERT', 'false'), ('PROV_REQUIRE_SECURE', 'true'), - ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE') + ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL') ; INSERT INTO GROUPS(GROUPID, AUTHID, NAME, DESCRIPTION, CLASSIFICATION, MEMBERS) VALUES (1, 'Basic dXNlcjE6cGFzc3dvcmQx', 'Group1', 'First Group for testing', 'Class1', 'Member1'); -INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID) -VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1); +INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER) +VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1, false); INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, SUBSCRIBER, SELF_LINK, LOG_LINK) VALUES (23, 1, 'http://delivery_url', 'user1', 'somepassword', 'sub123', 'selflink', 'loglink'); -- 2.16.6