X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FDeliveryQueue.java;h=0ba9ecfd4985c57584d605eefe66a599627433bf;hb=cdc8eb2849b09dbb7984593b422aa692dfb79af8;hp=ad7462551651e0f0817148eb58ac0d370a5bced0;hpb=8cbe8a88bc6dfe8673a33a017fe6a5a3e7ce86c3;p=dmaap%2Fdatarouter.git diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java index ad746255..0ba9ecfd 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java @@ -24,8 +24,11 @@ package org.onap.dmaap.datarouter.node; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import java.io.*; import java.util.*; +import org.jetbrains.annotations.Nullable; /** * Mechanism for monitoring and controlling delivery of files to a destination. @@ -64,23 +67,24 @@ import java.util.*; * failure timer is active or if no files are found in a directory scan. */ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { - private DeliveryQueueHelper dqh; - private DestInfo di; - private Hashtable working = new Hashtable(); - private Hashtable retry = new Hashtable(); + private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class); + private DeliveryQueueHelper deliveryQueueHelper; + private DestInfo destinationInfo; + private Hashtable working = new Hashtable<>(); + private Hashtable retry = new Hashtable<>(); private int todoindex; private boolean failed; private long failduration; private long resumetime; - File dir; - private Vector todo = new Vector(); + private File dir; + private Vector todo = new Vector<>(); /** * Try to cancel a delivery task. * * @return The length of the task in bytes or 0 if the task cannot be cancelled. */ - public synchronized long cancelTask(String pubid) { + synchronized long cancelTask(String pubid) { if (working.get(pubid) != null) { return (0); } @@ -111,8 +115,9 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has succeeded. */ - public synchronized void markSuccess(DeliveryTask task) { + private synchronized void markSuccess(DeliveryTask task) { working.remove(task.getPublishId()); + logger.debug(task.getPublishId() + " marked as success."); task.clean(); failed = false; failduration = 0; @@ -121,15 +126,17 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has expired. */ - public synchronized void markExpired(DeliveryTask task) { + private synchronized void markExpired(DeliveryTask task) { + logger.debug(task.getPublishId() + " marked as expired."); task.clean(); } /** * Mark that a delivery task has failed permanently. */ - public synchronized void markFailNoRetry(DeliveryTask task) { + private synchronized void markFailNoRetry(DeliveryTask task) { working.remove(task.getPublishId()); + logger.debug(task.getPublishId() + " marked as failed permanently"); task.clean(); failed = false; failduration = 0; @@ -139,11 +146,15 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { if (!failed) { failed = true; if (failduration == 0) { - failduration = dqh.getInitFailureTimer(); + if (destinationInfo.isPrivilegedSubscriber()) { + failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer(); + } else{ + failduration = deliveryQueueHelper.getInitFailureTimer(); + } } resumetime = System.currentTimeMillis() + failduration; - long maxdur = dqh.getMaxFailureTimer(); - failduration = (long) (failduration * dqh.getFailureBackoff()); + long maxdur = deliveryQueueHelper.getMaxFailureTimer(); + failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff()); if (failduration > maxdur) { failduration = maxdur; } @@ -153,16 +164,18 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has been redirected. */ - public synchronized void markRedirect(DeliveryTask task) { + private synchronized void markRedirect(DeliveryTask task) { working.remove(task.getPublishId()); + logger.debug(task.getPublishId() + " marked as redirected."); retry.put(task.getPublishId(), task); } /** * Mark that a delivery task has temporarily failed. */ - public synchronized void markFailWithRetry(DeliveryTask task) { + private synchronized void markFailWithRetry(DeliveryTask task) { working.remove(task.getPublishId()); + logger.debug(task.getPublishId() + " marked as temporarily failed."); retry.put(task.getPublishId(), task); fdupdate(); } @@ -170,7 +183,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Get the next task. */ - public synchronized DeliveryTask getNext() { + synchronized DeliveryTask getNext() { DeliveryTask ret = peekNext(); if (ret != null) { todoindex++; @@ -182,9 +195,9 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Peek at the next task. */ - public synchronized DeliveryTask peekNext() { + synchronized DeliveryTask peekNext() { long now = System.currentTimeMillis(); - long mindate = now - dqh.getExpirationTimer(); + long mindate = now - deliveryQueueHelper.getExpirationTimer(); if (failed) { if (now > resumetime) { failed = false; @@ -195,83 +208,50 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { while (true) { if (todoindex >= todo.size()) { todoindex = 0; - todo = new Vector(); + todo = new Vector<>(); String[] files = dir.list(); Arrays.sort(files); - for (String fname : files) { - if (!fname.endsWith(".M")) { - continue; - } - String fname2 = fname.substring(0, fname.length() - 2); - long pidtime = 0; - int dot = fname2.indexOf('.'); - if (dot < 1) { - continue; - } - try { - pidtime = Long.parseLong(fname2.substring(0, dot)); - } catch (Exception e) { - } - if (pidtime < 1000000000000L) { - continue; - } - if (working.get(fname2) != null) { - continue; - } - DeliveryTask dt = retry.get(fname2); - if (dt == null) { - dt = new DeliveryTask(this, fname2); - } - todo.add(dt); - } - retry = new Hashtable(); + scanForNextTask(files); + retry = new Hashtable<>(); } - if (todoindex < todo.size()) { - DeliveryTask dt = todo.get(todoindex); - if (dt.isCleaned()) { - todoindex++; - continue; - } - if (dt.getDate() >= mindate) { - return (dt); - } - todoindex++; - reportExpiry(dt); - continue; + DeliveryTask dt = getDeliveryTask(mindate); + if (dt != null) { + return dt; } - return (null); + return null; + } } /** * Create a delivery queue for a given destination info */ - public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) { - this.dqh = dqh; - this.di = di; - dir = new File(di.getSpool()); + DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { + this.deliveryQueueHelper = deliveryQueueHelper; + this.destinationInfo = destinationInfo; + dir = new File(destinationInfo.getSpool()); dir.mkdirs(); } /** * Update the destination info for this delivery queue */ - public void config(DestInfo di) { - this.di = di; + public void config(DestInfo destinationInfo) { + this.destinationInfo = destinationInfo; } /** * Get the dest info */ - public DestInfo getDestInfo() { - return (di); + public DestInfo getDestinationInfo() { + return (destinationInfo); } /** * Get the config manager */ public DeliveryQueueHelper getConfig() { - return (dqh); + return (deliveryQueueHelper); } /** @@ -284,7 +264,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Message too old to deliver */ - public void reportExpiry(DeliveryTask task) { + void reportExpiry(DeliveryTask task) { StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); markExpired(task); } @@ -294,22 +274,27 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { if (status < 300) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid); - markSuccess(task); - } else if (status < 400 && dqh.isFollowRedirects()) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); - if (dqh.handleRedirection(di, location, task.getFileId())) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); + if (destinationInfo.isPrivilegedSubscriber()) { + task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer()); + markFailWithRetry(task); + } else { + markSuccess(task); + } + } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); + if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) { markRedirect(task); } else { StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } - } else if (status < 500) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + } else if (status < 500 && status != 429) { // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time. + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); markFailNoRetry(task); } else { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location); markFailWithRetry(task); } } @@ -318,8 +303,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * Delivery failed by reason of an exception */ public void reportException(DeliveryTask task, Exception exception) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString()); - dqh.handleUnreachable(di); + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString()); + deliveryQueueHelper.handleUnreachable(destinationInfo); markFailWithRetry(task); } @@ -330,14 +315,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * @return The feed ID */ public String getFeedId(String subid) { - return (dqh.getFeedId(subid)); + return (deliveryQueueHelper.getFeedId(subid)); } /** * Get the URL to deliver a message to given the file ID */ public String getDestURL(String fileid) { - return (dqh.getDestURL(di, fileid)); + return (deliveryQueueHelper.getDestURL(destinationInfo, fileid)); } /** @@ -345,11 +330,12 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * files to deliver */ public void run() { - DeliveryTask t; - long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit(); - int filestogo = dqh.getFairFileLimit(); - while ((t = getNext()) != null) { - t.run(); + DeliveryTask task; + long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit(); + int filestogo = deliveryQueueHelper.getFairFileLimit(); + while ((task = getNext()) != null) { + logger.debug("Processing file: " + task.getPublishId()); + task.run(); if (--filestogo <= 0 || System.currentTimeMillis() > endtime) { break; } @@ -359,14 +345,92 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Is there no work to do for this queue right now? */ - public synchronized boolean isSkipSet() { + synchronized boolean isSkipSet() { return (peekNext() == null); } /** * Reset the retry timer */ - public void resetQueue() { + void resetQueue() { resumetime = System.currentTimeMillis(); } + + /** + * Get task if in queue and mark as success + */ + boolean markTaskSuccess(String pubId) { + DeliveryTask task = working.get(pubId); + if (task != null) { + markSuccess(task); + return true; + } + task = retry.get(pubId); + if (task != null) { + retry.remove(pubId); + task.clean(); + resetQueue(); + failduration = 0; + return true; + } + return false; + } + private void scanForNextTask(String[] files) { + for (String fname : files) { + String pubId = getPubId(fname); + if (pubId == null) { + continue; + } + DeliveryTask dt = retry.get(pubId); + if (dt == null) { + dt = new DeliveryTask(this, pubId); + } + todo.add(dt); + } + } + + @Nullable + private DeliveryTask getDeliveryTask(long mindate) { + if (todoindex < todo.size()) { + DeliveryTask dt = todo.get(todoindex); + if (dt.isCleaned()) { + todoindex++; + } + if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) { + retry.put(dt.getPublishId(), dt); + todoindex++; + } + if (dt.getDate() >= mindate) { + return (dt); + } + todoindex++; + reportExpiry(dt); + } + return null; + } + + @Nullable + private String getPubId(String fname) { + if (!fname.endsWith(".M")) { + return null; + } + String fname2 = fname.substring(0, fname.length() - 2); + long pidtime = 0; + int dot = fname2.indexOf('.'); + if (dot < 1) { + return null; + } + try { + pidtime = Long.parseLong(fname2.substring(0, dot)); + } catch (Exception e) { + logger.error("Exception", e); + } + if (pidtime < 1000000000000L) { + return null; + } + if (working.get(fname2) != null) { + return null; + } + return fname2; + } }