Remove datarouter-node critical code smells
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
index abdfa71..3d48587 100644 (file)
@@ -24,6 +24,8 @@
 
 package org.onap.dmaap.datarouter.node;
 
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
 import java.io.*;
 import java.util.*;
 
@@ -64,6 +66,7 @@ import java.util.*;
  * failure timer is active or if no files are found in a directory scan.
  */
 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
+    private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
     private DeliveryQueueHelper deliveryQueueHelper;
     private DestInfo destinationInfo;
     private Hashtable<String, DeliveryTask> working = new Hashtable<>();
@@ -72,7 +75,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     private boolean failed;
     private long failduration;
     private long resumetime;
-    File dir;
+    private File dir;
     private Vector<DeliveryTask> todo = new Vector<>();
 
     /**
@@ -80,7 +83,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      *
      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
      */
-    public synchronized long cancelTask(String pubid) {
+    synchronized long cancelTask(String pubid) {
         if (working.get(pubid) != null) {
             return (0);
         }
@@ -111,7 +114,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has succeeded.
      */
-    public synchronized void markSuccess(DeliveryTask task) {
+    private synchronized void markSuccess(DeliveryTask task) {
         working.remove(task.getPublishId());
         task.clean();
         failed = false;
@@ -121,14 +124,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has expired.
      */
-    public synchronized void markExpired(DeliveryTask task) {
+    private synchronized void markExpired(DeliveryTask task) {
         task.clean();
     }
 
     /**
      * Mark that a delivery task has failed permanently.
      */
-    public synchronized void markFailNoRetry(DeliveryTask task) {
+    private synchronized void markFailNoRetry(DeliveryTask task) {
         working.remove(task.getPublishId());
         task.clean();
         failed = false;
@@ -157,7 +160,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has been redirected.
      */
-    public synchronized void markRedirect(DeliveryTask task) {
+    private synchronized void markRedirect(DeliveryTask task) {
         working.remove(task.getPublishId());
         retry.put(task.getPublishId(), task);
     }
@@ -165,7 +168,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Mark that a delivery task has temporarily failed.
      */
-    public synchronized void markFailWithRetry(DeliveryTask task) {
+    private synchronized void markFailWithRetry(DeliveryTask task) {
         working.remove(task.getPublishId());
         retry.put(task.getPublishId(), task);
         fdupdate();
@@ -174,7 +177,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Get the next task.
      */
-    public synchronized DeliveryTask getNext() {
+    synchronized DeliveryTask getNext() {
         DeliveryTask ret = peekNext();
         if (ret != null) {
             todoindex++;
@@ -186,7 +189,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Peek at the next task.
      */
-    public synchronized DeliveryTask peekNext() {
+    synchronized DeliveryTask peekNext() {
         long now = System.currentTimeMillis();
         long mindate = now - deliveryQueueHelper.getExpirationTimer();
         if (failed) {
@@ -199,7 +202,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
         while (true) {
             if (todoindex >= todo.size()) {
                 todoindex = 0;
-                todo = new Vector<DeliveryTask>();
+                todo = new Vector<>();
                 String[] files = dir.list();
                 Arrays.sort(files);
                 for (String fname : files) {
@@ -215,6 +218,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
                     try {
                         pidtime = Long.parseLong(fname2.substring(0, dot));
                     } catch (Exception e) {
+                        logger.error("Exception", e);
                     }
                     if (pidtime < 1000000000000L) {
                         continue;
@@ -228,7 +232,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
                     }
                     todo.add(dt);
                 }
-                retry = new Hashtable<String, DeliveryTask>();
+                retry = new Hashtable<>();
             }
             if (todoindex < todo.size()) {
                 DeliveryTask dt = todo.get(todoindex);
@@ -236,6 +240,11 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
                     todoindex++;
                     continue;
                 }
+                if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
+                    retry.put(dt.getPublishId(), dt);
+                    todoindex++;
+                    continue;
+                }
                 if (dt.getDate() >= mindate) {
                     return (dt);
                 }
@@ -250,7 +259,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Create a delivery queue for a given destination info
      */
-    public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+    DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
         this.deliveryQueueHelper = deliveryQueueHelper;
         this.destinationInfo = destinationInfo;
         dir = new File(destinationInfo.getSpool());
@@ -288,7 +297,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Message too old to deliver
      */
-    public void reportExpiry(DeliveryTask task) {
+    void reportExpiry(DeliveryTask task) {
         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
         markExpired(task);
     }
@@ -298,8 +307,9 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
      */
     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
         if (status < 300) {
-               StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
-               if (destinationInfo.isPrivilegedSubscriber()) {
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+            if (destinationInfo.isPrivilegedSubscriber()) {
+                task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
                 markFailWithRetry(task);
             } else {
                 markSuccess(task);
@@ -367,21 +377,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     /**
      * Is there no work to do for this queue right now?
      */
-    public synchronized boolean isSkipSet() {
+    synchronized boolean isSkipSet() {
         return (peekNext() == null);
     }
 
     /**
      * Reset the retry timer
      */
-    public void resetQueue() {
+    void resetQueue() {
         resumetime = System.currentTimeMillis();
     }
 
     /**
      * Get task if in queue and mark as success
      */
-    public boolean markTaskSuccess(String pubId) {
+    boolean markTaskSuccess(String pubId) {
         DeliveryTask task = working.get(pubId);
         if (task != null) {
             markSuccess(task);
@@ -391,7 +401,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
         if (task != null) {
             retry.remove(pubId);
             task.clean();
-            resumetime = 0;
+            resetQueue();
             failduration = 0;
             return true;
         }