Add optional API for PM Mapper 80/77780/10
authoreconwar <conor.ward@est.tech>
Thu, 14 Feb 2019 09:37:44 +0000 (09:37 +0000)
committereconwar <conor.ward@est.tech>
Thu, 14 Feb 2019 09:37:44 +0000 (09:37 +0000)
Added new field to Subscriber class to keep files after published
Added new Delete endpoint so that file can then be deleted

Change-Id: Id72da67689a7ceda8ddd4997cd6349b981cb1cdb
Issue-ID: DMAAP-981
Signed-off-by: econwar <conor.ward@est.tech>
21 files changed:
datarouter-docker-compose/src/main/resources/database/sql_init_01.sql
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueueHelper.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java
datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeConfigTest.java
datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java
datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Subscription.java
datarouter-prov/src/main/resources/misc/sql_init_01.sql
datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SubscriptionServletTest.java
datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/beans/SubscriptionTest.java
datarouter-prov/src/test/resources/create.sql

index 0125832..60b638a 100644 (file)
@@ -31,20 +31,21 @@ CREATE TABLE FEED_ENDPOINT_ADDRS (
 );
 
 CREATE TABLE SUBSCRIPTIONS (
-    SUBID              INT UNSIGNED NOT NULL PRIMARY KEY,
-    FEEDID             INT UNSIGNED NOT NULL,
-    GROUPID            INT(10) UNSIGNED NOT NULL DEFAULT 0,
-    DELIVERY_URL       VARCHAR(256),
-    DELIVERY_USER      VARCHAR(20),
-    DELIVERY_PASSWORD  VARCHAR(32),
-    DELIVERY_USE100    BOOLEAN DEFAULT FALSE,
-    METADATA_ONLY      BOOLEAN DEFAULT FALSE,
-    SUBSCRIBER         VARCHAR(8) NOT NULL,
-    SELF_LINK          VARCHAR(256),
-    LOG_LINK           VARCHAR(256),
-    LAST_MOD           TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-    SUSPENDED          BOOLEAN DEFAULT FALSE,
-    CREATED_DATE       TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+    SUBID                       INT UNSIGNED NOT NULL PRIMARY KEY,
+    FEEDID                      INT UNSIGNED NOT NULL,
+    GROUPID                     INT(10) UNSIGNED NOT NULL DEFAULT 0,
+    DELIVERY_URL                VARCHAR(256),
+    DELIVERY_USER               VARCHAR(20),
+    DELIVERY_PASSWORD           VARCHAR(32),
+    DELIVERY_USE100             BOOLEAN DEFAULT FALSE,
+    METADATA_ONLY               BOOLEAN DEFAULT FALSE,
+    SUBSCRIBER                  VARCHAR(8) NOT NULL,
+    SELF_LINK                   VARCHAR(256),
+    LOG_LINK                    VARCHAR(256),
+    LAST_MOD                    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+    SUSPENDED                   BOOLEAN DEFAULT FALSE,
+    PRIVILEGED_SUBSCRIBER       BOOLEAN DEFAULT FALSE,
+    CREATED_DATE                TIMESTAMP DEFAULT CURRENT_TIMESTAMP
 
 );
 
@@ -134,6 +135,7 @@ INSERT INTO PARAMETERS VALUES
     ('DELIVERY_INIT_RETRY_INTERVAL', '10'),
     ('DELIVERY_MAX_AGE', '86400'),
     ('DELIVERY_MAX_RETRY_INTERVAL', '3600'),
+    ('DELIVERY_FILE_PROCESS_INTERVAL', '600'),
     ('DELIVERY_RETRY_RATIO', '2'),
     ('LOGROLL_INTERVAL', '30'),
     ('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'),
@@ -142,5 +144,5 @@ INSERT INTO PARAMETERS VALUES
     ('PROV_MAXSUB_COUNT',   '100000'),
     ('PROV_REQUIRE_CERT', 'false'),
     ('PROV_REQUIRE_SECURE', 'false'),
-    ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE')
+    ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL')
     ;
index ae4f13b..d2600d2 100644 (file)
@@ -267,4 +267,18 @@ public class Delivery {
             }
         }
     }
+
+    /**
+     * Mark the task in spool a success
+     */
+    public synchronized boolean markTaskSuccess(String spool, String pubId) {
+        boolean succeeded = false;
+        if (spool != null) {
+            DeliveryQueue dq = dqs.get(spool);
+            if (dq != null) {
+                succeeded = dq.markTaskSuccess(pubId);
+            }
+        }
+        return succeeded;
+    }
 }
index ad74625..abdfa71 100644 (file)
@@ -64,16 +64,16 @@ import java.util.*;
  * failure timer is active or if no files are found in a directory scan.
  */
 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
-    private DeliveryQueueHelper dqh;
-    private DestInfo di;
-    private Hashtable<String, DeliveryTask> working = new Hashtable<String, DeliveryTask>();
-    private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>();
+    private DeliveryQueueHelper deliveryQueueHelper;
+    private DestInfo destinationInfo;
+    private Hashtable<String, DeliveryTask> working = new Hashtable<>();
+    private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
     private int todoindex;
     private boolean failed;
     private long failduration;
     private long resumetime;
     File dir;
-    private Vector<DeliveryTask> todo = new Vector<DeliveryTask>();
+    private Vector<DeliveryTask> todo = new Vector<>();
 
     /**
      * Try to cancel a delivery task.
@@ -139,11 +139,15 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
         if (!failed) {
             failed = true;
             if (failduration == 0) {
-                failduration = dqh.getInitFailureTimer();
+                if (destinationInfo.isPrivilegedSubscriber()) {
+                    failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
+                } else{
+                    failduration = deliveryQueueHelper.getInitFailureTimer();
+                }
             }
             resumetime = System.currentTimeMillis() + failduration;
-            long maxdur = dqh.getMaxFailureTimer();
-            failduration = (long) (failduration * dqh.getFailureBackoff());
+            long maxdur = deliveryQueueHelper.getMaxFailureTimer();
+            failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
             if (failduration > maxdur) {
                 failduration = maxdur;
             }
@@ -184,7 +188,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      */
     public synchronized DeliveryTask peekNext() {
         long now = System.currentTimeMillis();
-        long mindate = now - dqh.getExpirationTimer();
+        long mindate = now - deliveryQueueHelper.getExpirationTimer();
         if (failed) {
             if (now > resumetime) {
                 failed = false;
@@ -246,32 +250,32 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Create a delivery queue for a given destination info
      */
-    public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) {
-        this.dqh = dqh;
-        this.di = di;
-        dir = new File(di.getSpool());
+    public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+        this.deliveryQueueHelper = deliveryQueueHelper;
+        this.destinationInfo = destinationInfo;
+        dir = new File(destinationInfo.getSpool());
         dir.mkdirs();
     }
 
     /**
      * Update the destination info for this delivery queue
      */
-    public void config(DestInfo di) {
-        this.di = di;
+    public void config(DestInfo destinationInfo) {
+        this.destinationInfo = destinationInfo;
     }
 
     /**
      * Get the dest info
      */
-    public DestInfo getDestInfo() {
-        return (di);
+    public DestInfo getDestinationInfo() {
+        return (destinationInfo);
     }
 
     /**
      * Get the config manager
      */
     public DeliveryQueueHelper getConfig() {
-        return (dqh);
+        return (deliveryQueueHelper);
     }
 
     /**
@@ -294,22 +298,26 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      */
     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
         if (status < 300) {
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid);
-            markSuccess(task);
-        } else if (status < 400 && dqh.isFollowRedirects()) {
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
-            if (dqh.handleRedirection(di, location, task.getFileId())) {
+               StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+               if (destinationInfo.isPrivilegedSubscriber()) {
+                markFailWithRetry(task);
+            } else {
+                markSuccess(task);
+            }
+        } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+            if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
                 markRedirect(task);
             } else {
                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
                 markFailNoRetry(task);
             }
-        } else if (status < 500) {
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+        } else if (status < 500 && status != 429) {         // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
             markFailNoRetry(task);
         } else {
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
             markFailWithRetry(task);
         }
     }
@@ -318,8 +326,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      * Delivery failed by reason of an exception
      */
     public void reportException(DeliveryTask task, Exception exception) {
-        StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString());
-        dqh.handleUnreachable(di);
+        StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
+        deliveryQueueHelper.handleUnreachable(destinationInfo);
         markFailWithRetry(task);
     }
 
@@ -330,14 +338,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      * @return The feed ID
      */
     public String getFeedId(String subid) {
-        return (dqh.getFeedId(subid));
+        return (deliveryQueueHelper.getFeedId(subid));
     }
 
     /**
      * Get the URL to deliver a message to given the file ID
      */
     public String getDestURL(String fileid) {
-        return (dqh.getDestURL(di, fileid));
+        return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
     }
 
     /**
@@ -346,8 +354,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      */
     public void run() {
         DeliveryTask t;
-        long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit();
-        int filestogo = dqh.getFairFileLimit();
+        long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
+        int filestogo = deliveryQueueHelper.getFairFileLimit();
         while ((t = getNext()) != null) {
             t.run();
             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
@@ -369,4 +377,24 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     public void resetQueue() {
         resumetime = System.currentTimeMillis();
     }
+
+    /**
+     * Get task if in queue and mark as success
+     */
+    public boolean markTaskSuccess(String pubId) {
+        DeliveryTask task = working.get(pubId);
+        if (task != null) {
+            markSuccess(task);
+            return true;
+        }
+        task = retry.get(pubId);
+        if (task != null) {
+            retry.remove(pubId);
+            task.clean();
+            resumetime = 0;
+            failduration = 0;
+            return true;
+        }
+        return false;
+    }
 }
index b1734cd..5cf5fa4 100644 (file)
@@ -34,64 +34,69 @@ public interface DeliveryQueueHelper {
     /**
      * Get the timeout (milliseconds) before retrying after an initial delivery failure
      */
-    public long getInitFailureTimer();
+    long getInitFailureTimer();
+
+    /**
+     * Get the timeout before retrying after delivery and wait for file processing
+     */
+    long getWaitForFileProcessFailureTimer();
 
     /**
      * Get the ratio between timeouts on consecutive delivery attempts
      */
-    public double getFailureBackoff();
+    double getFailureBackoff();
 
     /**
      * Get the maximum timeout (milliseconds) between delivery attempts
      */
-    public long getMaxFailureTimer();
+    long getMaxFailureTimer();
 
     /**
      * Get the expiration timer (milliseconds) for deliveries
      */
-    public long getExpirationTimer();
+    long getExpirationTimer();
 
     /**
      * Get the maximum number of file delivery attempts before checking
      * if another queue has work to be performed.
      */
-    public int getFairFileLimit();
+    int getFairFileLimit();
 
     /**
      * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
      */
-    public long getFairTimeLimit();
+    long getFairTimeLimit();
 
     /**
      * Get the URL for delivering a file
      *
-     * @param dest   The destination information for the file to be delivered.
+     * @param destinationInfo   The destination information for the file to be delivered.
      * @param fileid The file id for the file to be delivered.
-     * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid).
+     * @return The URL for delivering the file (typically, destinationInfo.getURL() + "/" + fileid).
      */
-    public String getDestURL(DestInfo dest, String fileid);
+    String getDestURL(DestInfo destinationInfo, String fileid);
 
     /**
      * Forget redirections associated with a subscriber
      *
-     * @param    dest    Destination information to forget
+     * @param    destinationInfo    Destination information to forget
      */
-    public void handleUnreachable(DestInfo dest);
+    void handleUnreachable(DestInfo destinationInfo);
 
     /**
      * Post redirection for a subscriber
      *
-     * @param    dest    Destination information to update
+     * @param    destinationInfo    Destination information to update
      * @param    location    Location given by subscriber
      * @param    fileid    File ID of request
      * @return true if this 3xx response is retryable, otherwise, false.
      */
-    public boolean handleRedirection(DestInfo dest, String location, String fileid);
+    boolean handleRedirection(DestInfo destinationInfo, String location, String fileid);
 
     /**
      * Should I handle 3xx responses differently than 4xx responses?
      */
-    public boolean isFollowRedirects();
+    boolean isFollowRedirects();
 
     /**
      * Get the feed ID for a subscription
@@ -99,5 +104,5 @@ public interface DeliveryQueueHelper {
      * @param subid The subscription ID
      * @return The feed ID
      */
-    public String getFeedId(String subid);
+    String getFeedId(String subid);
 }
index 8072990..b2c3169 100644 (file)
@@ -47,9 +47,9 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     private static Logger loggerDeliveryTask = Logger.getLogger("org.onap.dmaap.datarouter.node.DeliveryTask");
     private static EELFLogger eelflogger = EELFManager.getInstance()
             .getLogger(DeliveryTask.class);
-    private DeliveryTaskHelper dth;
+    private DeliveryTaskHelper deliveryTaskHelper;
     private String pubid;
-    private DestInfo di;
+    private DestInfo destInfo;
     private String spool;
     private File datafile;
     private File metafile;
@@ -69,25 +69,25 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Create a delivery task for a given delivery queue and pub ID
      *
-     * @param    dth    The delivery task helper for the queue this task is in.
+     * @param    deliveryTaskHelper    The delivery task helper for the queue this task is in.
      * @param    pubid    The publish ID for this file.  This is used as
      * the base for the file name in the spool directory and is of
      * the form <milliseconds since 1970>.<fqdn of initial data router node>
      */
-    public DeliveryTask(DeliveryTaskHelper dth, String pubid) {
-        this.dth = dth;
+    public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+        this.deliveryTaskHelper = deliveryTaskHelper;
         this.pubid = pubid;
-        di = dth.getDestInfo();
-        subid = di.getSubId();
-        feedid = di.getLogData();
-        spool = di.getSpool();
+        destInfo = deliveryTaskHelper.getDestinationInfo();
+        subid = destInfo.getSubId();
+        feedid = destInfo.getLogData();
+        spool = destInfo.getSpool();
         String dfn = spool + "/" + pubid;
         String mfn = dfn + ".M";
         datafile = new File(spool + "/" + pubid);
         metafile = new File(mfn);
-        boolean monly = di.isMetaDataOnly();
+        boolean monly = destInfo.isMetaDataOnly();
         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
-        Vector<String[]> hdrv = new Vector<String[]>();
+        Vector<String[]> hdrv = new Vector<>();
 
         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
             String s = br.readLine();
@@ -104,7 +104,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 String v = s.substring(i + 1);
                 if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
                     subid = v.replaceAll("[^ ]*/", "");
-                    feedid = dth.getFeedId(subid.replaceAll(" .*", ""));
+                    feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
                 }
                 if (length == 0 && h.toLowerCase().startsWith("content-")) {
                     continue;
@@ -126,7 +126,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
         }
         hdrs = hdrv.toArray(new String[hdrv.size()][]);
-        url = dth.getDestURL(fileid);
+        url = deliveryTaskHelper.getDestURL(fileid);
     }
     /**
      * Is the object a DeliveryTask with the same publication ID?
@@ -171,14 +171,14 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     public void run() {
         attempts++;
         try {
-            di = dth.getDestInfo();
-            boolean expect100 = di.isUsing100();
-            boolean monly = di.isMetaDataOnly();
+            destInfo = deliveryTaskHelper.getDestinationInfo();
+            boolean expect100 = destInfo.isUsing100();
+            boolean monly = destInfo.isMetaDataOnly();
             length = 0;
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
-            url = dth.getDestURL(fileid);
+            url = deliveryTaskHelper.getDestURL(fileid);
             URL u = new URL(url);
             HttpURLConnection uc = (HttpURLConnection) u.openConnection();
             uc.setConnectTimeout(60000);
@@ -186,7 +186,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             uc.setInstanceFollowRedirects(false);
             uc.setRequestMethod(method);
             uc.setRequestProperty("Content-Length", Long.toString(length));
-            uc.setRequestProperty("Authorization", di.getAuth());
+            uc.setRequestProperty("Authorization", destInfo.getAuth());
             uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
             for (String[] nv : hdrs) {
                 uc.addRequestProperty(nv[0], nv[1]);
@@ -201,7 +201,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 try {
                     os = uc.getOutputStream();
                 } catch (ProtocolException pe) {
-                    dth.reportDeliveryExtra(this, -1L);
+                    deliveryTaskHelper.reportDeliveryExtra(this, -1L);
                     // Rcvd error instead of 100-continue
                     loggerDeliveryTask.error("Exception "+pe.getStackTrace(),pe);
                 }
@@ -223,7 +223,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                         }
                         os.close();
                     } catch (IOException ioe) {
-                        dth.reportDeliveryExtra(this, sofar);
+                        deliveryTaskHelper.reportDeliveryExtra(this, sofar);
                         throw ioe;
                     }
                 }
@@ -257,10 +257,10 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 }
                 is.close();
             }
-            dth.reportStatus(this, rc, xpubid, rmsg);
+            deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
         } catch (Exception e) {
             loggerDeliveryTask.error("Exception "+e.getStackTrace(),e);
-            dth.reportException(this, e);
+            deliveryTaskHelper.reportException(this, e);
         }
     }
 
index 932b792..d4ac8bd 100644 (file)
@@ -38,7 +38,7 @@ public interface DeliveryTaskHelper {
      * @param task      The task that failed
      * @param exception The exception that occurred
      */
-    public void reportException(DeliveryTask task, Exception exception);
+    void reportException(DeliveryTask task, Exception exception);
 
     /**
      * Report that a delivery attempt completed (successfully or unsuccessfully)
@@ -48,7 +48,7 @@ public interface DeliveryTaskHelper {
      * @param xpubid   The publish ID from the far end (if any)
      * @param location The redirection location for a 3XX response
      */
-    public void reportStatus(DeliveryTask task, int status, String xpubid, String location);
+    void reportStatus(DeliveryTask task, int status, String xpubid, String location);
 
     /**
      * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.
@@ -56,14 +56,14 @@ public interface DeliveryTaskHelper {
      * @param task The task that failed
      * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue.
      */
-    public void reportDeliveryExtra(DeliveryTask task, long sent);
+    void reportDeliveryExtra(DeliveryTask task, long sent);
 
     /**
      * Get the destination information for the delivery queue
      *
      * @return The destination information
      */
-    public DestInfo getDestInfo();
+    DestInfo getDestinationInfo();
 
     /**
      * Given a file ID, get the URL to deliver to
@@ -71,7 +71,7 @@ public interface DeliveryTaskHelper {
      * @param fileid The file id
      * @return The URL to deliver to
      */
-    public String getDestURL(String fileid);
+    String getDestURL(String fileid);
 
     /**
      * Get the feed ID for a subscription
@@ -79,5 +79,5 @@ public interface DeliveryTaskHelper {
      * @param subid The subscription ID
      * @return The feed iD
      */
-    public String getFeedId(String subid);
+    String getFeedId(String subid);
 }
index 1225331..c3e0057 100644 (file)
@@ -37,6 +37,7 @@ public class DestInfo {
     private String authentication;
     private boolean metaonly;
     private boolean use100;
+    private boolean privilegedSubscriber;
 
     /**
      * Create a destination information object.
@@ -50,8 +51,9 @@ public class DestInfo {
      * @param    authentication    The credentials.
      * @param    metaonly    Is this a metadata only delivery?
      * @param    use100    Should I use expect 100-continue?
+     * @param    privilegedSubscriber   Can we wait to receive a file processed acknowledgement before deleting file
      */
-    public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) {
+    public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100, boolean privilegedSubscriber) {
         this.name = name;
         this.spool = spool;
         this.subid = subid;
@@ -61,6 +63,27 @@ public class DestInfo {
         this.authentication = authentication;
         this.metaonly = metaonly;
         this.use100 = use100;
+        this.privilegedSubscriber = privilegedSubscriber;
+    }
+
+    /**
+     * Create a destination information object.
+     *
+     * @param    name    n:fqdn or s:subid
+     * @param    spool    The directory where files are spooled.
+     * @param    subscription    The subscription.
+     */
+    public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) {
+        this.name = name;
+        this.spool = spool;
+        this.subid = subscription.getSubId();
+        this.logdata = subscription.getFeedId();
+        this.url = subscription.getURL();
+        this.authuser = subscription.getAuthUser();
+        this.authentication = subscription.getCredentials();
+        this.metaonly = subscription.isMetaDataOnly();
+        this.use100 = subscription.isUsing100();
+        this.privilegedSubscriber = subscription.isPrivilegedSubscriber();
     }
 
     public boolean equals(Object o) {
@@ -150,4 +173,11 @@ public class DestInfo {
     public boolean isUsing100() {
         return (use100);
     }
+
+    /**
+     * Should we wait to receive a file processed acknowledgement before deleting file
+     */
+    public boolean isPrivilegedSubscriber() {
+        return (privilegedSubscriber);
+    }
 }
index 1d5f76f..ff803af 100644 (file)
@@ -57,6 +57,10 @@ public class LogManager extends TimerTask {
             return (10000L);
         }
 
+        public long getWaitForFileProcessFailureTimer() {
+            return (600000L);
+        }
+
         public double getFailureBackoff() {
             return (2.0);
         }
@@ -77,14 +81,14 @@ public class LogManager extends TimerTask {
             return (86400000);
         }
 
-        public String getDestURL(DestInfo dest, String fileid) {
+        public String getDestURL(DestInfo destinationInfo, String fileid) {
             return (config.getEventLogUrl());
         }
 
-        public void handleUnreachable(DestInfo dest) {
+        public void handleUnreachable(DestInfo destinationInfo) {
         }
 
-        public boolean handleRedirection(DestInfo dest, String location, String fileid) {
+        public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) {
             return (false);
         }
 
@@ -101,7 +105,7 @@ public class LogManager extends TimerTask {
         public Uploader() {
             dq = new DeliveryQueue(this,
                 new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false,
-                    false));
+                    false, false));
             setDaemon(true);
             setName("Log Uploader");
             start();
index c40d29c..d3d3d01 100644 (file)
@@ -231,6 +231,7 @@ public class NodeConfig {
         private String credentials;
         private boolean metaonly;
         private boolean use100;
+        private boolean privilegedSubscriber;
 
         /**
          * Construct a subscription configuration entry
@@ -243,9 +244,10 @@ public class NodeConfig {
          * Authorization header.
          * @param metaonly Is this a meta data only subscription?
          * @param use100 Should we send Expect: 100-continue?
+         * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
          */
         public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
-                boolean metaonly, boolean use100) {
+                boolean metaonly, boolean use100, boolean privilegedSubscriber) {
             this.subid = subid;
             this.feedid = feedid;
             this.url = url;
@@ -253,6 +255,7 @@ public class NodeConfig {
             this.credentials = credentials;
             this.metaonly = metaonly;
             this.use100 = use100;
+            this.privilegedSubscriber = privilegedSubscriber;
         }
 
         /**
@@ -303,6 +306,13 @@ public class NodeConfig {
         public boolean isUsing100() {
             return (use100);
         }
+
+        /**
+         * Can we wait to receive a delete file call before deleting file
+         */
+        public boolean isPrivilegedSubscriber() {
+            return (privilegedSubscriber);
+        }
     }
 
     /**
@@ -462,11 +472,12 @@ public class NodeConfig {
         Target[] targets;
     }
 
-    private Hashtable<String, String> params = new Hashtable<String, String>();
-    private Hashtable<String, Feed> feeds = new Hashtable<String, Feed>();
-    private Hashtable<String, DestInfo> nodeinfo = new Hashtable<String, DestInfo>();
-    private Hashtable<String, DestInfo> subinfo = new Hashtable<String, DestInfo>();
-    private Hashtable<String, IsFrom> nodes = new Hashtable<String, IsFrom>();
+    private Hashtable<String, String> params = new Hashtable<>();
+    private Hashtable<String, Feed> feeds = new Hashtable<>();
+    private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
+    private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
+    private Hashtable<String, IsFrom> nodes = new Hashtable<>();
+    private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
     private String myname;
     private String myauth;
     private DestInfo[] alldests;
@@ -486,7 +497,7 @@ public class NodeConfig {
         for (ProvParam p : pd.getParams()) {
             params.put(p.getName(), p.getValue());
         }
-        Vector<DestInfo> div = new Vector<DestInfo>();
+        Vector<DestInfo> destInfos = new Vector<>();
         myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
         for (ProvNode pn : pd.getNodes()) {
             String cn = pn.getCName();
@@ -495,9 +506,9 @@ public class NodeConfig {
             }
             String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);
             DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn,
-                    "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true);
+                    "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false);
             (new File(di.getSpool())).mkdirs();
-            div.add(di);
+            destInfos.add(di);
             nodeinfo.put(cn, di);
             nodes.put(auth, new IsFrom(cn));
         }
@@ -533,7 +544,7 @@ public class NodeConfig {
             }
             egrtab.put(pfe.getSubId(), pfe.getNode());
         }
-        Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<String, Vector<SubnetMatcher>>();
+        Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
         for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
             Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
             if (v == null) {
@@ -542,46 +553,47 @@ public class NodeConfig {
             }
             v.add(new SubnetMatcher(pfs.getCidr()));
         }
-        Hashtable<String, StringBuffer> ttab = new Hashtable<String, StringBuffer>();
-        HashSet<String> allfeeds = new HashSet<String>();
+        Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
+        HashSet<String> allfeeds = new HashSet<>();
         for (ProvFeed pfx : pd.getFeeds()) {
             if (pfx.getStatus() == null) {
                 allfeeds.add(pfx.getId());
             }
         }
-        for (ProvSubscription ps : pd.getSubscriptions()) {
-            String sid = ps.getSubId();
-            String fid = ps.getFeedId();
-            if (!allfeeds.contains(fid)) {
+        for (ProvSubscription provSubscription : pd.getSubscriptions()) {
+            String subId = provSubscription.getSubId();
+            String feedId = provSubscription.getFeedId();
+            if (!allfeeds.contains(feedId)) {
                 continue;
             }
-            if (subinfo.get(sid) != null) {
+            if (subinfo.get(subId) != null) {
                 continue;
             }
             int sididx = 999;
             try {
-                sididx = Integer.parseInt(sid);
+                sididx = Integer.parseInt(subId);
                 sididx -= sididx % 100;
             } catch (Exception e) {
             }
-            String siddir = sididx + "/" + sid;
-            DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(),
-                    ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100());
-            (new File(di.getSpool())).mkdirs();
-            div.add(di);
-            subinfo.put(sid, di);
-            String egr = egrtab.get(sid);
+            String subscriptionDirectory = sididx + "/" + subId;
+            DestInfo destinationInfo = new DestInfo("s:" + subId,
+                    spooldir + "/s/" + subscriptionDirectory, provSubscription);
+            (new File(destinationInfo.getSpool())).mkdirs();
+            destInfos.add(destinationInfo);
+            provSubscriptions.put(subId, provSubscription);
+            subinfo.put(subId, destinationInfo);
+            String egr = egrtab.get(subId);
             if (egr != null) {
-                sid = pf.getPath(egr) + sid;
+                subId = pf.getPath(egr) + subId;
             }
-            StringBuffer sb = ttab.get(fid);
+            StringBuffer sb = feedTargets.get(feedId);
             if (sb == null) {
                 sb = new StringBuffer();
-                ttab.put(fid, sb);
+                feedTargets.put(feedId, sb);
             }
-            sb.append(' ').append(sid);
+            sb.append(' ').append(subId);
         }
-        alldests = div.toArray(new DestInfo[div.size()]);
+        alldests = destInfos.toArray(new DestInfo[destInfos.size()]);
         for (ProvFeed pfx : pd.getFeeds()) {
             String fid = pfx.getId();
             Feed f = feeds.get(fid);
@@ -609,7 +621,7 @@ public class NodeConfig {
             } else {
                 f.redirections = v2.toArray(new Redirection[v2.size()]);
             }
-            StringBuffer sb = ttab.get(fid);
+            StringBuffer sb = feedTargets.get(fid);
             if (sb == null) {
                 f.targets = new Target[0];
             } else {
@@ -711,6 +723,16 @@ public class NodeConfig {
         return ("Publisher not permitted for this feed");
     }
 
+    /**
+     * Check whether delete file is allowed.
+     *
+     * @param subId The ID of the subscription being requested.
+     */
+    public boolean isDeletePermitted(String subId) {
+        ProvSubscription provSubscription = provSubscriptions.get(subId);
+        return provSubscription.isPrivilegedSubscriber();
+    }
+
     /**
      * Get authenticated user
      */
index 474f5dd..d98c47a 100644 (file)
@@ -57,6 +57,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     private Timer timer = new Timer("Node Configuration Timer", true);
     private long maxfailuretimer;
     private long initfailuretimer;
+    private long waitForFileProcessFailureTimer;
     private long expirationtimer;
     private double failurebackoff;
     private long fairtimelimit;
@@ -187,6 +188,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
         eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
         initfailuretimer = 10000;
+        waitForFileProcessFailureTimer = 600000;
         maxfailuretimer = 3600000;
         expirationtimer = 86400000;
         failurebackoff = 2.0;
@@ -199,6 +201,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
             initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
         } catch (Exception e) {
         }
+        try {
+            waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
+        } catch (Exception e) {
+        }
         try {
             maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
         } catch (Exception e) {
@@ -328,6 +334,16 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         return (config.isPublishPermitted(feedid, credentials, ip));
     }
 
+    /**
+     * Check whether delete file is allowed.
+     *
+     * @param subId The ID of the subscription being requested
+     * @return True if the delete file is permitted for the subscriber.
+     */
+    public boolean isDeletePermitted(String subId) {
+        return (config.isDeletePermitted(subId));
+    }
+
     /**
      * Check who the user is given the feed ID and the offered credentials.
      *
@@ -407,13 +423,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     /**
      * Get the URL to deliver a message to.
      *
-     * @param destinfo The destination information
+     * @param destinationInfo The destination information
      * @param fileid The file ID
      * @return The URL to deliver to
      */
-    public String getDestURL(DestInfo destinfo, String fileid) {
-        String subid = destinfo.getSubId();
-        String purl = destinfo.getURL();
+    public String getDestURL(DestInfo destinationInfo, String fileid) {
+        String subid = destinationInfo.getSubId();
+        String purl = destinationInfo.getURL();
         if (followredirects && subid != null) {
             purl = rdmgr.lookup(subid, purl);
         }
@@ -430,10 +446,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     /**
      * Set up redirection on receipt of a 3XX from a target URL
      */
-    public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {
+    public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
         fileid = "/" + fileid;
-        String subid = destinfo.getSubId();
-        String purl = destinfo.getURL();
+        String subid = destinationInfo.getSubId();
+        String purl = destinationInfo.getURL();
         if (followredirects && subid != null && redirto.endsWith(fileid)) {
             redirto = redirto.substring(0, redirto.length() - fileid.length());
             if (!redirto.equals(purl)) {
@@ -447,8 +463,8 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     /**
      * Handle unreachable target URL
      */
-    public void handleUnreachable(DestInfo destinfo) {
-        String subid = destinfo.getSubId();
+    public void handleUnreachable(DestInfo destinationInfo) {
+        String subid = destinationInfo.getSubId();
         if (followredirects && subid != null) {
             rdmgr.forget(subid);
         }
@@ -461,6 +477,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         return (initfailuretimer);
     }
 
+    /**
+     * Get the timeout before retrying after delivery and wait for file processing
+     */
+    public long getWaitForFileProcessFailureTimer() {
+        return (waitForFileProcessFailureTimer);
+    }
+
     /**
      * Get the maximum timeout between delivery attempts
      */
index e07642c..d25531a 100644 (file)
@@ -145,7 +145,7 @@ public class NodeMain {
         ctxt = new ServletContextHandler(0);
         ctxt.setContextPath("/");
         server.setHandler(ctxt);
-        ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*");
+        ctxt.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*");
         nodeMainLogger.info("NODE0005 Data Router Node Activating Service");
         server.start();
         server.join();
index 7e1d46d..fae2c1f 100644 (file)
@@ -40,11 +40,11 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Enumeration;
 import java.util.regex.Pattern;
-import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
 import org.slf4j.MDC;
 
@@ -67,6 +67,7 @@ public class NodeServlet extends HttpServlet {
     //Adding EELF Logger Rally:US664892
     private static EELFLogger eelflogger = EELFManager.getInstance()
             .getLogger(NodeServlet.class);
+    private Delivery delivery;
 
     static {
         final String ws = "\\s*";
@@ -80,6 +81,10 @@ public class NodeServlet extends HttpServlet {
         MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);
     }
 
+    NodeServlet(Delivery delivery) {
+        this.delivery = delivery;
+    }
+
     /**
      * Get the NodeConfigurationManager
      */
@@ -155,16 +160,13 @@ public class NodeServlet extends HttpServlet {
         } catch (IOException ioe) {
             logger.error("IOException" + ioe.getMessage());
             eelflogger.info(EelfMsgs.EXIT);
-        } catch (ServletException se) {
-            logger.error("ServletException" + se.getMessage());
-            eelflogger.info(EelfMsgs.EXIT);
         }
     }
 
     /**
      * Handle all DELETE requests
      */
-    protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    protected void doDelete(HttpServletRequest req, HttpServletResponse resp) {
         NodeUtils.setIpAndFqdnForEelf("doDelete");
         NodeUtils.setRequestIdAndInvocationId(req);
         eelflogger.info(EelfMsgs.ENTRY);
@@ -173,40 +175,26 @@ public class NodeServlet extends HttpServlet {
         try {
             common(req, resp, false);
         } catch (IOException ioe) {
-            logger.error("IOException" + ioe.getMessage());
-            eelflogger.info(EelfMsgs.EXIT);
-        } catch (ServletException se) {
-            logger.error("ServletException" + se.getMessage());
+            logger.error("IOException " + ioe.getMessage());
             eelflogger.info(EelfMsgs.EXIT);
         }
-
     }
 
-    private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput)
-            throws ServletException, IOException {
-        if (down(resp)) {
-            eelflogger.info(EelfMsgs.EXIT);
-            return;
-        }
-        if (!req.isSecure()) {
-            logger.info(
-                    "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
-                            .getRemoteAddr());
-            resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
-            eelflogger.info(EelfMsgs.EXIT);
-            return;
-        }
-        String fileid = req.getPathInfo();
-        if (fileid == null) {
-            logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
-                    .getRemoteAddr());
-            resp.sendError(HttpServletResponse.SC_NOT_FOUND,
-                    "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");
-            eelflogger.info(EelfMsgs.EXIT);
-            return;
-        }
+    private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException {
+        String fileid = getFileId(req, resp);
+        if (fileid == null) return;
         String feedid = null;
         String user = null;
+        String ip = req.getRemoteAddr();
+        String lip = req.getLocalAddr();
+        String pubid = null;
+        String xpubid = null;
+        String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
+        Target[] targets = null;
+        if (fileid.startsWith("/delete/")) {
+            deleteFile(req, resp, fileid, pubid);
+            return;
+        }
         String credentials = req.getHeader("Authorization");
         if (credentials == null) {
             logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req
@@ -215,12 +203,6 @@ public class NodeServlet extends HttpServlet {
             eelflogger.info(EelfMsgs.EXIT);
             return;
         }
-        String ip = req.getRemoteAddr();
-        String lip = req.getLocalAddr();
-        String pubid = null;
-        String xpubid = null;
-        String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
-        Target[] targets = null;
         if (fileid.startsWith("/publish/")) {
             fileid = fileid.substring(9);
             int i = fileid.indexOf('/');
@@ -315,8 +297,8 @@ public class NodeServlet extends HttpServlet {
             mx.append(req.getMethod()).append('\t').append(fileid).append('\n');
             Enumeration hnames = req.getHeaderNames();
             String ctype = null;
-            Boolean hasRequestIdHeader = false;
-            Boolean hasInvocationIdHeader = false;
+            boolean hasRequestIdHeader = false;
+            boolean hasInvocationIdHeader = false;
             while (hnames.hasMoreElements()) {
                 String hn = (String) hnames.nextElement();
                 String hnlc = hn.toLowerCase();
@@ -449,6 +431,90 @@ public class NodeServlet extends HttpServlet {
         }
     }
 
+    private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) {
+        try {
+            fileid = fileid.substring(8);
+            int i = fileid.indexOf('/');
+            if (i == -1 || i == fileid.length() - 1) {
+                logger.info("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req
+                        .getRemoteAddr());
+                resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+                        "Invalid request URI. Expecting <subId>/<pubId>.");
+                eelflogger.info(EelfMsgs.EXIT);
+                return;
+            }
+            String subscriptionId = fileid.substring(0, i);
+            int subId = Integer.parseInt(subscriptionId);
+            pubid = fileid.substring(i + 1);
+            String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+                            + config.getMyName() + ".";
+            int subIdDir = subId - (subId % 100);
+            if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) {
+                return;
+            }
+            boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid);
+            if (result) {
+                logger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: "
+                        + config.getMyName());
+                resp.setStatus(HttpServletResponse.SC_OK);
+                eelflogger.info(EelfMsgs.EXIT);
+            } else {
+                logger.error("NODE0116 " + errorMessage);
+                resp.sendError(HttpServletResponse.SC_NOT_FOUND, "File not found on server.");
+                eelflogger.info(EelfMsgs.EXIT);
+            }
+        } catch (IOException ioe) {
+            logger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+                    + config.getMyName() + ". Error: " + ioe.getMessage());
+            eelflogger.info(EelfMsgs.EXIT);
+        }
+    }
+
+    @Nullable
+    private String getFileId(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+        if (down(resp)) {
+            eelflogger.info(EelfMsgs.EXIT);
+            return null;
+        }
+        if (!req.isSecure()) {
+            logger.info(
+                    "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
+                            .getRemoteAddr());
+            resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
+            eelflogger.info(EelfMsgs.EXIT);
+            return null;
+        }
+        String fileid = req.getPathInfo();
+        if (fileid == null) {
+            logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+                    .getRemoteAddr());
+            resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+                    "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");
+            eelflogger.info(EelfMsgs.EXIT);
+            return null;
+        }
+        return fileid;
+    }
+
+    private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException {
+        try {
+            boolean deletePermitted = config.isDeletePermitted(subscriptionId);
+            if (!deletePermitted) {
+                logger.error("NODE0113 " + errorMessage + " Error: Subscription "
+                        + subscriptionId + " is not a privileged subscription");
+                resp.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+                eelflogger.info(EelfMsgs.EXIT);
+                return false;
+            }
+        } catch (NullPointerException npe) {
+            logger.error("NODE0114 " + errorMessage + " Error: Subscription " + subscriptionId + " does not exist");
+            resp.sendError(HttpServletResponse.SC_NOT_FOUND);
+            eelflogger.info(EelfMsgs.EXIT);
+            return false;
+        }
+        return true;
+    }
+
     private int getIdFromPath(HttpServletRequest req) {
         String path = req.getPathInfo();
         if (path == null || path.length() < 2) {
index f0b8174..765a407 100644 (file)
@@ -173,7 +173,8 @@ public class ProvData {
                     String password = gvas(jdel, "password");
                     boolean monly = jsub.getBoolean("metadataOnly");
                     boolean use100 = jdel.getBoolean("use100");
-                    psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100));
+                    boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
+                    psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber));
                 }
             }
             JSONObject jparams = jcfg.optJSONObject("parameters");
index 8a7460e..ae8cd2c 100644 (file)
@@ -97,7 +97,7 @@ public class DeliveryTest {
 
   private DestInfo[] createDestInfoObjects() {
     DestInfo[] destInfos = new DestInfo[1];
-    DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true);
+    DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true, false);
     destInfos[0] = destInfo;
     return destInfos;
   }
index 1810817..5092141 100644 (file)
@@ -210,6 +210,7 @@ public class NodeConfigTest {
         delivery.put("password", "password1");
         delivery.put("use100", true);
         subscription.put("delivery", delivery);
+        subscription.put("privilegedSubscriber", false);
         subscriptions.put(subscription);
         provData.put("subscriptions", subscriptions);
     }
index 2a659aa..065565d 100644 (file)
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.*;
 
 import static org.hamcrest.Matchers.notNullValue;
@@ -49,6 +51,7 @@ import static org.mockito.Mockito.*;
 public class NodeServletTest {
 
     private NodeServlet nodeServlet;
+    private Delivery delivery;
 
     @Mock
     private HttpServletRequest request;
@@ -59,18 +62,26 @@ public class NodeServletTest {
     ListAppender<ILoggingEvent> listAppender;
 
     @Before
-    public void setUp() throws Exception{
+    public void setUp() throws Exception {
         listAppender = setTestLogger();
-        nodeServlet = new NodeServlet();
         setBehalfHeader("Stub_Value");
         when(request.getPathInfo()).thenReturn("2");
         when(request.isSecure()).thenReturn(true);
+        createFilesAndDirectories();
         setUpConfig();
         setUpNodeMainDelivery();
+        delivery = mock(Delivery.class);
+        when(delivery.markTaskSuccess("spool/s/0/1", "dmaap-dr-node.1234567")).thenReturn(true);
+        nodeServlet = new NodeServlet(delivery);
         when(request.getHeader("Authorization")).thenReturn("User1");
         when(request.getHeader("X-DMAAP-DR-PUBLISH-ID")).thenReturn("User1");
     }
 
+    @AfterClass
+    public static void tearDown() {
+        deleteCreatedDirectories();
+    }
+
     @Test
     public void Given_Request_Is_HTTP_GET_And_Config_Is_Down_Then_Service_Unavailable_Response_Is_Generated() throws Exception {
         setNodeConfigManagerIsConfiguredToReturnFalse();
@@ -80,7 +91,7 @@ public class NodeServletTest {
     }
 
     @Test
-    public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_Internal_FetchProv_Then_No_Content_Response_Is_Generated() throws Exception {
+    public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_Internal_FetchProv_Then_No_Content_Response_Is_Generated() {
         when(request.getPathInfo()).thenReturn("/internal/fetchProv");
         nodeServlet.doGet(request, response);
         verify(response).setStatus(eq(HttpServletResponse.SC_NO_CONTENT));
@@ -88,7 +99,7 @@ public class NodeServletTest {
     }
 
     @Test
-    public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_ResetSubscription_Then_No_Content_Response_Is_Generated() throws Exception {
+    public void Given_Request_Is_HTTP_GET_And_Endpoint_Is_ResetSubscription_Then_No_Content_Response_Is_Generated() {
         when(request.getPathInfo()).thenReturn("/internal/resetSubscription/1");
         nodeServlet.doGet(request, response);
         verify(response).setStatus(eq(HttpServletResponse.SC_NO_CONTENT));
@@ -213,6 +224,48 @@ public class NodeServletTest {
         verifyEnteringExitCalled(listAppender);
     }
 
+    @Test
+    public void Given_Request_Is_HTTP_DELETE_File_With_Invalid_Endpoint_Then_Not_Found_Response_Is_Generated() throws Exception {
+        when(request.getPathInfo()).thenReturn("/delete/1");
+        nodeServlet.doDelete(request, response);
+        verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND), argThat(notNullValue(String.class)));
+        verifyEnteringExitCalled(listAppender);
+    }
+
+    @Test
+    public void Given_Request_Is_HTTP_DELETE_File_And_Is_Not_Privileged_Subscription_Then_Not_Found_Response_Is_Generated() throws Exception {
+        when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567");
+        setUpConfigToReturnUnprivilegedSubscriber();
+        nodeServlet.doDelete(request, response);
+        verify(response).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED));
+        verifyEnteringExitCalled(listAppender);
+    }
+
+    @Test
+    public void Given_Request_Is_HTTP_DELETE_File_And_Subscription_Does_Not_Exist_Then_Not_Found_Response_Is_Generated() throws Exception {
+        when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567");
+        setUpConfigToReturnNullOnIsDeletePermitted();
+        nodeServlet.doDelete(request, response);
+        verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND));
+        verifyEnteringExitCalled(listAppender);
+    }
+
+    @Test
+    public void Given_Request_Is_HTTP_DELETE_File_Then_Request_Succeeds() throws Exception {
+        when(request.getPathInfo()).thenReturn("/delete/1/dmaap-dr-node.1234567");
+        createFilesAndDirectories();
+        nodeServlet.doDelete(request, response);
+        verify(response).setStatus(eq(HttpServletResponse.SC_OK));
+        verifyEnteringExitCalled(listAppender);
+    }
+
+    @Test
+    public void Given_Request_Is_HTTP_DELETE_File_And_File_Does_Not_Exist_Then_Not_Found_Response_Is_Generated() throws IOException {
+        when(request.getPathInfo()).thenReturn("/delete/1/nonExistingFile");
+        nodeServlet.doDelete(request, response);
+        verify(response).sendError(eq(HttpServletResponse.SC_NOT_FOUND), argThat(notNullValue(String.class)));
+        verifyEnteringExitCalled(listAppender);
+    }
 
     private void setBehalfHeader(String headerValue) {
         when(request.getHeader("X-DMAAP-DR-ON-BEHALF-OF")).thenReturn(headerValue);
@@ -232,21 +285,45 @@ public class NodeServletTest {
         assertEquals(3, listAppender.list.size());
     }
 
-    private void setUpConfig() throws IllegalAccessException{
+    private void setUpConfig() throws IllegalAccessException {
         NodeConfigManager config = mock(NodeConfigManager.class);
         PowerMockito.mockStatic(NodeConfigManager.class);
         when(config.isShutdown()).thenReturn(false);
         when(config.isConfigured()).thenReturn(true);
-        when(config.getSpoolDir()).thenReturn("spool/dir");
+        when(config.getSpoolDir()).thenReturn("spool/f");
+        when(config.getSpoolBase()).thenReturn("spool");
         when(config.getLogDir()).thenReturn("log/dir");
         when(config.getPublishId()).thenReturn("User1");
         when(config.isAnotherNode(anyString(), anyString())).thenReturn(true);
         when(config.getEventLogInterval()).thenReturn("40");
+        when(config.isDeletePermitted("1")).thenReturn(true);
+        when(config.getAllDests()).thenReturn(new DestInfo[0]);
         FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true);
         FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true);
         PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config);
     }
 
+    private void setUpConfigToReturnUnprivilegedSubscriber() throws IllegalAccessException {
+        NodeConfigManager config = mock(NodeConfigManager.class);
+        PowerMockito.mockStatic(NodeConfigManager.class);
+        when(config.isShutdown()).thenReturn(false);
+        when(config.isConfigured()).thenReturn(true);
+        when(config.isDeletePermitted("1")).thenReturn(false);
+        FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true);
+        FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true);
+        PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config);
+    }
+
+    private void setUpConfigToReturnNullOnIsDeletePermitted() throws IllegalAccessException {
+        NodeConfigManager config = mock(NodeConfigManager.class);
+        PowerMockito.mockStatic(NodeConfigManager.class);
+        when(config.isShutdown()).thenReturn(false);
+        when(config.isConfigured()).thenReturn(true);
+        when(config.isDeletePermitted("1")).thenThrow(new NullPointerException());
+        FieldUtils.writeDeclaredStaticField(NodeServlet.class, "config", config, true);
+        FieldUtils.writeDeclaredStaticField(NodeMain.class, "nodeConfigManager", config, true);
+        PowerMockito.when(NodeConfigManager.getInstance()).thenReturn(config);
+    }
 
     private void setUpNodeMainDelivery() throws IllegalAccessException{
         Delivery delivery = mock(Delivery.class);
@@ -313,4 +390,31 @@ public class NodeServletTest {
         when(request.getHeaders("X-DMAAP-DR-ON-BEHALF-OF")).thenReturn(behalfHeader);
         when(request.getHeaders("X-DMAAP-DR-META")).thenReturn(metaDataHeader);
     }
+
+    private void createFilesAndDirectories() throws IOException {
+        File nodeDir = new File("spool/n/172.0.0.1");
+        File spoolDir = new File("spool/s/0/1");
+        File dataFile = new File("spool/s/0/1/dmaap-dr-node.1234567");
+        File metaDataFile = new File("spool/s/0/1/dmaap-dr-node.1234567.M");
+        nodeDir.mkdirs();
+        spoolDir.mkdirs();
+        dataFile.createNewFile();
+        metaDataFile.createNewFile();
+    }
+
+    private static void deleteCreatedDirectories() {
+        File spoolDir = new File("spool");
+        delete(spoolDir);
+    }
+
+    private static void delete(File file) {
+        if (file.isDirectory()) {
+            for (File f: file.listFiles()) {
+                delete(f);
+            }
+        }
+        if (!file.delete()) {
+            System.out.println("Failed to delete: " + file);
+        }
+    }
 }
index e1b24c5..230df75 100644 (file)
@@ -68,6 +68,7 @@ public class Subscription extends Syncable {
     private boolean suspended;\r
     private Date lastMod;\r
     private Date createdDate;\r
+    private boolean privilegedSubscriber;\r
 \r
     public static Subscription getSubscriptionMatching(Subscription sub) {\r
         SubDelivery deli = sub.getDelivery();\r
@@ -200,6 +201,7 @@ public class Subscription extends Syncable {
         this.suspended = false;\r
         this.lastMod = new Date();\r
         this.createdDate = new Date();\r
+        this.privilegedSubscriber = false;\r
     }\r
 \r
     public Subscription(ResultSet rs) throws SQLException {\r
@@ -214,6 +216,7 @@ public class Subscription extends Syncable {
         this.suspended = rs.getBoolean("SUSPENDED");\r
         this.lastMod = rs.getDate("LAST_MOD");\r
         this.createdDate = rs.getDate("CREATED_DATE");\r
+        this.privilegedSubscriber = rs.getBoolean("PRIVILEGED_SUBSCRIBER");\r
     }\r
 \r
     public Subscription(JSONObject jo) throws InvalidObjectException {\r
@@ -249,7 +252,7 @@ public class Subscription extends Syncable {
 \r
             this.metadataOnly = jo.getBoolean("metadataOnly");\r
             this.suspended = jo.optBoolean("suspend", false);\r
-\r
+            this.privilegedSubscriber = jo.optBoolean("privilegedSubscriber", false);\r
             this.subscriber = jo.optString("subscriber", "");\r
             JSONObject jol = jo.optJSONObject("links");\r
             this.links = (jol == null) ? (new SubLinks()) : (new SubLinks(jol));\r
@@ -323,6 +326,14 @@ public class Subscription extends Syncable {
         this.suspended = suspended;\r
     }\r
 \r
+    public boolean isPrivilegedSubscriber() {\r
+        return privilegedSubscriber;\r
+    }\r
+\r
+    public void setPrivilegedSubscriber(boolean privilegedSubscriber) {\r
+        this.privilegedSubscriber = privilegedSubscriber;\r
+    }\r
+\r
     public String getSubscriber() {\r
         return subscriber;\r
     }\r
@@ -357,6 +368,7 @@ public class Subscription extends Syncable {
         jo.put("suspend", suspended);\r
         jo.put(LAST_MOD_KEY, lastMod.getTime());\r
         jo.put(CREATED_DATE, createdDate.getTime());\r
+        jo.put("privilegedSubscriber", privilegedSubscriber);\r
         return jo;\r
     }\r
 \r
@@ -394,7 +406,7 @@ public class Subscription extends Syncable {
             }\r
 \r
             // Create the SUBSCRIPTIONS row\r
-            String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
+            String sql = "insert into SUBSCRIPTIONS (SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
             ps = c.prepareStatement(sql, new String[]{SUBID_COL});\r
             ps.setInt(1, subid);\r
             ps.setInt(2, feedid);\r
@@ -406,6 +418,7 @@ public class Subscription extends Syncable {
             ps.setString(8, getSubscriber());\r
             ps.setBoolean(9, isSuspended());\r
             ps.setInt(10, groupid); //New field is added - Groups feature Rally:US708115 - 1610\r
+            ps.setBoolean(11, isPrivilegedSubscriber());\r
             ps.execute();\r
             ps.close();\r
             // Update the row to set the URLs\r
@@ -436,7 +449,7 @@ public class Subscription extends Syncable {
         boolean rv = true;\r
         PreparedStatement ps = null;\r
         try {\r
-            String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ? where SUBID = ?";\r
+            String sql = "update SUBSCRIPTIONS set DELIVERY_URL = ?, DELIVERY_USER = ?, DELIVERY_PASSWORD = ?, DELIVERY_USE100 = ?, METADATA_ONLY = ?, SUSPENDED = ?, GROUPID = ?, PRIVILEGED_SUBSCRIBER = ? where SUBID = ?";\r
             ps = c.prepareStatement(sql);\r
             ps.setString(1, delivery.getUrl());\r
             ps.setString(2, delivery.getUser());\r
@@ -445,7 +458,8 @@ public class Subscription extends Syncable {
             ps.setInt(5, isMetadataOnly() ? 1 : 0);\r
             ps.setInt(6, suspended ? 1 : 0);\r
             ps.setInt(7, groupid); //New field is added - Groups feature Rally:US708115 - 1610\r
-            ps.setInt(8, subid);\r
+            ps.setInt(8, privilegedSubscriber ? 1 : 0);\r
+            ps.setInt(9, subid);\r
             ps.executeUpdate();\r
         } catch (SQLException e) {\r
             rv = false;\r
index 0125832..60b638a 100755 (executable)
@@ -31,20 +31,21 @@ CREATE TABLE FEED_ENDPOINT_ADDRS (
 );
 
 CREATE TABLE SUBSCRIPTIONS (
-    SUBID              INT UNSIGNED NOT NULL PRIMARY KEY,
-    FEEDID             INT UNSIGNED NOT NULL,
-    GROUPID            INT(10) UNSIGNED NOT NULL DEFAULT 0,
-    DELIVERY_URL       VARCHAR(256),
-    DELIVERY_USER      VARCHAR(20),
-    DELIVERY_PASSWORD  VARCHAR(32),
-    DELIVERY_USE100    BOOLEAN DEFAULT FALSE,
-    METADATA_ONLY      BOOLEAN DEFAULT FALSE,
-    SUBSCRIBER         VARCHAR(8) NOT NULL,
-    SELF_LINK          VARCHAR(256),
-    LOG_LINK           VARCHAR(256),
-    LAST_MOD           TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-    SUSPENDED          BOOLEAN DEFAULT FALSE,
-    CREATED_DATE       TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+    SUBID                       INT UNSIGNED NOT NULL PRIMARY KEY,
+    FEEDID                      INT UNSIGNED NOT NULL,
+    GROUPID                     INT(10) UNSIGNED NOT NULL DEFAULT 0,
+    DELIVERY_URL                VARCHAR(256),
+    DELIVERY_USER               VARCHAR(20),
+    DELIVERY_PASSWORD           VARCHAR(32),
+    DELIVERY_USE100             BOOLEAN DEFAULT FALSE,
+    METADATA_ONLY               BOOLEAN DEFAULT FALSE,
+    SUBSCRIBER                  VARCHAR(8) NOT NULL,
+    SELF_LINK                   VARCHAR(256),
+    LOG_LINK                    VARCHAR(256),
+    LAST_MOD                    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+    SUSPENDED                   BOOLEAN DEFAULT FALSE,
+    PRIVILEGED_SUBSCRIBER       BOOLEAN DEFAULT FALSE,
+    CREATED_DATE                TIMESTAMP DEFAULT CURRENT_TIMESTAMP
 
 );
 
@@ -134,6 +135,7 @@ INSERT INTO PARAMETERS VALUES
     ('DELIVERY_INIT_RETRY_INTERVAL', '10'),
     ('DELIVERY_MAX_AGE', '86400'),
     ('DELIVERY_MAX_RETRY_INTERVAL', '3600'),
+    ('DELIVERY_FILE_PROCESS_INTERVAL', '600'),
     ('DELIVERY_RETRY_RATIO', '2'),
     ('LOGROLL_INTERVAL', '30'),
     ('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'),
@@ -142,5 +144,5 @@ INSERT INTO PARAMETERS VALUES
     ('PROV_MAXSUB_COUNT',   '100000'),
     ('PROV_REQUIRE_CERT', 'false'),
     ('PROV_REQUIRE_SECURE', 'false'),
-    ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE')
+    ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL')
     ;
index fd1e68e..43973d5 100755 (executable)
@@ -22,7 +22,6 @@
  ******************************************************************************/
 package org.onap.dmaap.datarouter.provisioning;
 
-import ch.qos.logback.classic.Logger;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 import org.apache.commons.lang3.reflect.FieldUtils;
@@ -39,7 +38,6 @@ import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
 import org.onap.dmaap.datarouter.provisioning.beans.Updateable;
 import org.onap.dmaap.datarouter.provisioning.utils.DB;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.LoggerFactory;
 
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
@@ -48,15 +46,12 @@ import javax.servlet.ServletInputStream;
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.PrintWriter;
 import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 import static org.onap.dmaap.datarouter.provisioning.BaseServlet.BEHALF_HEADER;
 
@@ -283,6 +278,7 @@ public class SubscriptionServletTest extends DrServletTestBase {
                 jo.put("version", "2.0");
                 jo.put("metadataOnly", true);
                 jo.put("suspend", true);
+                jo.put("privilegedSubscriber", true);
                 jo.put("delivery", JSObject);
                 jo.put("subscriber", "differentSubscriber");
                 jo.put("sync", true);
@@ -305,6 +301,7 @@ public class SubscriptionServletTest extends DrServletTestBase {
                 jo.put("version", "2.0");
                 jo.put("metadataOnly", true);
                 jo.put("suspend", true);
+                jo.put("privilegedSubscriber", true);
                 jo.put("delivery", JSObject);
                 jo.put("sync", true);
                 return jo;
@@ -333,6 +330,7 @@ public class SubscriptionServletTest extends DrServletTestBase {
                 jo.put("version", "2.0");
                 jo.put("metadataOnly", true);
                 jo.put("suspend", true);
+                jo.put("privilegedSubscriber", true);
                 jo.put("delivery", JSObject);
                 jo.put("sync", true);
                 jo.put("changeowner", true);
@@ -505,6 +503,7 @@ public class SubscriptionServletTest extends DrServletTestBase {
         subscription.setGroupid(1);
         subscription.setMetadataOnly(false);
         subscription.setSuspended(false);
+        subscription.setPrivilegedSubscriber(false);
         subscription.doInsert(db.getConnection());
     }
 
@@ -518,6 +517,7 @@ public class SubscriptionServletTest extends DrServletTestBase {
         subscription.setGroupid(1);
         subscription.setMetadataOnly(false);
         subscription.setSuspended(false);
+        subscription.setPrivilegedSubscriber(false);
         subscription.changeOwnerShip();
         subscription.doUpdate(db.getConnection());
     }
index 1e0c042..3e6aed0 100644 (file)
@@ -62,6 +62,7 @@ public class SubscriptionTest {
         subscription.setMetadataOnly(false);
         subscription.setSubscriber(subscriber);
         subscription.setSuspended(false);
+        subscription.setPrivilegedSubscriber(false);
         subscription.setLinks(subLinks);
 
         Assert.assertEquals(2, subscription.getGroupid());
@@ -69,5 +70,6 @@ public class SubscriptionTest {
         Assert.assertEquals(subLinks, subscription.getLinks());
         Assert.assertFalse(subscription.isMetadataOnly());
         Assert.assertFalse(subscription.isSuspended());
+        Assert.assertFalse(subscription.isPrivilegedSubscriber());
     }
 }
\ No newline at end of file
index b704602..fa7402a 100755 (executable)
@@ -29,20 +29,21 @@ CREATE TABLE FEED_ENDPOINT_ADDRS (
 );
 
 CREATE TABLE SUBSCRIPTIONS (
-    SUBID              INT UNSIGNED NOT NULL PRIMARY KEY,
-    FEEDID             INT UNSIGNED NOT NULL,
-    GROUPID            INT(10) UNSIGNED NOT NULL DEFAULT 0,
-    DELIVERY_URL       VARCHAR(256),
-    DELIVERY_USER      VARCHAR(20),
-    DELIVERY_PASSWORD  VARCHAR(32),
-    DELIVERY_USE100    BOOLEAN DEFAULT FALSE,
-    METADATA_ONLY      BOOLEAN DEFAULT FALSE,
-    SUBSCRIBER         VARCHAR(8) NOT NULL,
-    SELF_LINK          VARCHAR(256),
-    LOG_LINK           VARCHAR(256),
-    LAST_MOD           TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-    SUSPENDED          BOOLEAN DEFAULT FALSE,
-    CREATED_DATE       TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+    SUBID                       INT UNSIGNED NOT NULL PRIMARY KEY,
+    FEEDID                      INT UNSIGNED NOT NULL,
+    GROUPID                     INT(10) UNSIGNED NOT NULL DEFAULT 0,
+    DELIVERY_URL                VARCHAR(256),
+    DELIVERY_USER               VARCHAR(20),
+    DELIVERY_PASSWORD           VARCHAR(32),
+    DELIVERY_USE100             BOOLEAN DEFAULT FALSE,
+    METADATA_ONLY               BOOLEAN DEFAULT FALSE,
+    SUBSCRIBER                  VARCHAR(8) NOT NULL,
+    SELF_LINK                   VARCHAR(256),
+    LOG_LINK                    VARCHAR(256),
+    LAST_MOD                    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+    SUSPENDED                   BOOLEAN DEFAULT FALSE,
+    PRIVILEGED_SUBSCRIBER       BOOLEAN DEFAULT FALSE,
+    CREATED_DATE                TIMESTAMP DEFAULT CURRENT_TIMESTAMP
 
 );
 
@@ -132,6 +133,7 @@ INSERT INTO PARAMETERS VALUES
     ('DELIVERY_INIT_RETRY_INTERVAL', '10'),
     ('DELIVERY_MAX_AGE', '86400'),
     ('DELIVERY_MAX_RETRY_INTERVAL', '3600'),
+    ('DELIVERY_FILE_PROCESS_INTERVAL', '600'),
     ('DELIVERY_RETRY_RATIO', '2'),
     ('LOGROLL_INTERVAL', '300'),
     ('PROV_AUTH_ADDRESSES', 'dmaap-dr-prov|dmaap-dr-node'),
@@ -140,14 +142,14 @@ INSERT INTO PARAMETERS VALUES
     ('PROV_MAXSUB_COUNT',   '100000'),
     ('PROV_REQUIRE_CERT', 'false'),
     ('PROV_REQUIRE_SECURE', 'true'),
-    ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE')
+    ('_INT_VALUES', 'LOGROLL_INTERVAL|PROV_MAXFEED_COUNT|PROV_MAXSUB_COUNT|DELIVERY_INIT_RETRY_INTERVAL|DELIVERY_MAX_RETRY_INTERVAL|DELIVERY_RETRY_RATIO|DELIVERY_MAX_AGE|DELIVERY_FILE_PROCESS_INTERVAL')
     ;
 
 INSERT INTO GROUPS(GROUPID, AUTHID, NAME, DESCRIPTION, CLASSIFICATION, MEMBERS)
 VALUES (1, 'Basic dXNlcjE6cGFzc3dvcmQx', 'Group1', 'First Group for testing', 'Class1', 'Member1');
 
-INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID)
-VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1);
+INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, DELIVERY_USE100, METADATA_ONLY, SUBSCRIBER, SUSPENDED, GROUPID, PRIVILEGED_SUBSCRIBER)
+VALUES (1, 1, 'https://172.100.0.5:8080', 'user1', 'password1', true, false, 'user1', false, 1, false);
 
 INSERT INTO SUBSCRIPTIONS(SUBID, FEEDID, DELIVERY_URL, DELIVERY_USER, DELIVERY_PASSWORD, SUBSCRIBER, SELF_LINK, LOG_LINK)
 VALUES (23, 1, 'http://delivery_url', 'user1', 'somepassword', 'sub123', 'selflink', 'loglink');