Merge "Adding ProvData unit tests"
authorConor Ward <conor.ward@est.tech>
Fri, 29 Mar 2019 08:32:20 +0000 (08:32 +0000)
committerGerrit Code Review <gerrit@onap.org>
Fri, 29 Mar 2019 08:32:20 +0000 (08:32 +0000)
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
pom.xml

index abdfa71..bef8dab 100644 (file)
@@ -72,7 +72,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     private boolean failed;
     private long failduration;
     private long resumetime;
-    File dir;
+    private File dir;
     private Vector<DeliveryTask> todo = new Vector<>();
 
     /**
@@ -80,7 +80,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      *
      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
      */
-    public synchronized long cancelTask(String pubid) {
+    synchronized long cancelTask(String pubid) {
         if (working.get(pubid) != null) {
             return (0);
         }
@@ -111,7 +111,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has succeeded.
      */
-    public synchronized void markSuccess(DeliveryTask task) {
+    private synchronized void markSuccess(DeliveryTask task) {
         working.remove(task.getPublishId());
         task.clean();
         failed = false;
@@ -121,14 +121,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has expired.
      */
-    public synchronized void markExpired(DeliveryTask task) {
+    private synchronized void markExpired(DeliveryTask task) {
         task.clean();
     }
 
     /**
      * Mark that a delivery task has failed permanently.
      */
-    public synchronized void markFailNoRetry(DeliveryTask task) {
+    private synchronized void markFailNoRetry(DeliveryTask task) {
         working.remove(task.getPublishId());
         task.clean();
         failed = false;
@@ -157,7 +157,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has been redirected.
      */
-    public synchronized void markRedirect(DeliveryTask task) {
+    private synchronized void markRedirect(DeliveryTask task) {
         working.remove(task.getPublishId());
         retry.put(task.getPublishId(), task);
     }
@@ -165,7 +165,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has temporarily failed.
      */
-    public synchronized void markFailWithRetry(DeliveryTask task) {
+    private synchronized void markFailWithRetry(DeliveryTask task) {
         working.remove(task.getPublishId());
         retry.put(task.getPublishId(), task);
         fdupdate();
@@ -174,7 +174,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Get the next task.
      */
-    public synchronized DeliveryTask getNext() {
+    synchronized DeliveryTask getNext() {
         DeliveryTask ret = peekNext();
         if (ret != null) {
             todoindex++;
@@ -186,7 +186,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Peek at the next task.
      */
-    public synchronized DeliveryTask peekNext() {
+    synchronized DeliveryTask peekNext() {
         long now = System.currentTimeMillis();
         long mindate = now - deliveryQueueHelper.getExpirationTimer();
         if (failed) {
@@ -199,7 +199,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
         while (true) {
             if (todoindex >= todo.size()) {
                 todoindex = 0;
-                todo = new Vector<DeliveryTask>();
+                todo = new Vector<>();
                 String[] files = dir.list();
                 Arrays.sort(files);
                 for (String fname : files) {
@@ -228,7 +228,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
                     }
                     todo.add(dt);
                 }
-                retry = new Hashtable<String, DeliveryTask>();
+                retry = new Hashtable<>();
             }
             if (todoindex < todo.size()) {
                 DeliveryTask dt = todo.get(todoindex);
@@ -236,6 +236,11 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
                     todoindex++;
                     continue;
                 }
+                if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
+                    retry.put(dt.getPublishId(), dt);
+                    todoindex++;
+                    continue;
+                }
                 if (dt.getDate() >= mindate) {
                     return (dt);
                 }
@@ -250,7 +255,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Create a delivery queue for a given destination info
      */
-    public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+    DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
         this.deliveryQueueHelper = deliveryQueueHelper;
         this.destinationInfo = destinationInfo;
         dir = new File(destinationInfo.getSpool());
@@ -288,7 +293,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Message too old to deliver
      */
-    public void reportExpiry(DeliveryTask task) {
+    void reportExpiry(DeliveryTask task) {
         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
         markExpired(task);
     }
@@ -298,8 +303,9 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      */
     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
         if (status < 300) {
-               StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
-               if (destinationInfo.isPrivilegedSubscriber()) {
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+            if (destinationInfo.isPrivilegedSubscriber()) {
+                task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
                 markFailWithRetry(task);
             } else {
                 markSuccess(task);
@@ -367,21 +373,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Is there no work to do for this queue right now?
      */
-    public synchronized boolean isSkipSet() {
+    synchronized boolean isSkipSet() {
         return (peekNext() == null);
     }
 
     /**
      * Reset the retry timer
      */
-    public void resetQueue() {
+    void resetQueue() {
         resumetime = System.currentTimeMillis();
     }
 
     /**
      * Get task if in queue and mark as success
      */
-    public boolean markTaskSuccess(String pubId) {
+    boolean markTaskSuccess(String pubId) {
         DeliveryTask task = working.get(pubId);
         if (task != null) {
             markSuccess(task);
@@ -391,7 +397,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
         if (task != null) {
             retry.remove(pubId);
             task.clean();
-            resumetime = 0;
+            resetQueue();
             failduration = 0;
             return true;
         }
index a3af88f..c085ebe 100644 (file)
@@ -66,6 +66,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     private int attempts;
     private String[][] hdrs;
     private String newInvocationId;
+    private long resumeTime;
 
 
     /**
@@ -76,7 +77,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
      *                           the base for the file name in the spool directory and is of
      *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
      */
-    public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
+    DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
         this.deliveryTaskHelper = deliveryTaskHelper;
         this.pubid = pubid;
         destInfo = deliveryTaskHelper.getDestinationInfo();
@@ -89,6 +90,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
         metafile = new File(mfn);
         boolean monly = destInfo.isMetaDataOnly();
         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
+        resumeTime = System.currentTimeMillis();
         Vector<String[]> hdrv = new Vector<>();
 
         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
@@ -165,7 +167,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Get the publish ID
      */
-    public String getPublishId() {
+    String getPublishId() {
         return (pubid);
     }
 
@@ -332,7 +334,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Remove meta and data files
      */
-    public void clean() {
+    void clean() {
         datafile.delete();
         metafile.delete();
         eelflogger.info(EelfMsgs.INVOKE, newInvocationId);
@@ -340,10 +342,24 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
         hdrs = null;
     }
 
+    /**
+     * Set the resume time for a delivery task.
+     */
+    void setResumeTime(long resumeTime) {
+        this.resumeTime = resumeTime;
+    }
+
+    /**
+     * Get the resume time for a delivery task.
+     */
+    long getResumeTime() {
+        return resumeTime;
+    }
+
     /**
      * Has this delivery task been cleaned?
      */
-    public boolean isCleaned() {
+    boolean isCleaned() {
         return (hdrs == null);
     }
 
@@ -357,7 +373,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Get creation date as encoded in the publish ID.
      */
-    public long getDate() {
+    long getDate() {
         return (date);
     }
 
@@ -371,42 +387,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     /**
      * Get the content type
      */
-    public String getCType() {
+    String getCType() {
         return (ctype);
     }
 
     /**
      * Get the method
      */
-    public String getMethod() {
+    String getMethod() {
         return (method);
     }
 
     /**
      * Get the file ID
      */
-    public String getFileId() {
+    String getFileId() {
         return (fileid);
     }
 
     /**
      * Get the number of delivery attempts
      */
-    public int getAttempts() {
+    int getAttempts() {
         return (attempts);
     }
 
     /**
      * Get the (space delimited list of) subscription ID for this delivery task
      */
-    public String getSubId() {
+    String getSubId() {
         return (subid);
     }
 
     /**
      * Get the feed ID for this delivery task
      */
-    public String getFeedId() {
+    String getFeedId() {
         return (feedid);
     }
 }
diff --git a/pom.xml b/pom.xml
index 417263c..1609b11 100755 (executable)
--- a/pom.xml
+++ b/pom.xml
@@ -63,7 +63,7 @@
         <skip.docker.tag>true</skip.docker.tag>
         <skip.docker.push>true</skip.docker.push>
         <commons-codec.version>1.10</commons-codec.version>
-        <sonar.exclusions>datarouter-prov/src/main/java/org/onap/dmaap/datarouter/reports/**</sonar.exclusions>
+        <sonar.exclusions>**/src/main/java/org/onap/dmaap/datarouter/reports/**</sonar.exclusions>
     </properties>
     <modules>
         <module>datarouter-prov</module>
                         <exclude>**/generated-sources/**</exclude>
                         <exclude>**/yang-gen/**</exclude>
                         <exclude>**/pax/**</exclude>
-                        <exclude>datarouter-prov/src/main/java/org/onap/dmaap/datarouter/reports/**</exclude>
+                        <exclude>**/src/main/java/org/onap/dmaap/datarouter/reports/**</exclude>
                     </excludes>
                 </configuration>
                 <executions>