X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdatarouter.git;a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FDelivery.java;h=df73c1e91f623e69c1c648122815de35f068041a;hp=4c21b34236d7329d598a34805118b08451dbc6e9;hb=0a440fd3ae3b413cd7de57677aec690f14ec7d53;hpb=3ebd2534167e73426d2b19efb05eaf9892f6f9d6 diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java index 4c21b342..df73c1e9 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java @@ -23,85 +23,37 @@ package org.onap.dmaap.datarouter.node; -import java.util.*; -import java.io.*; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Hashtable; +import java.util.Objects; /** * Main control point for delivering files to destinations. - *

- * The Delivery class manages assignment of delivery threads to delivery - * queues and creation and destruction of delivery queues as - * configuration changes. DeliveryQueues are assigned threads based on a - * modified round-robin approach giving priority to queues with more work - * as measured by both bytes to deliver and files to deliver and lower - * priority to queues that already have delivery threads working. - * A delivery thread continues to work for a delivery queue as long as - * that queue has more files to deliver. + * + *

The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of + * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin + * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower + * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery + * queue as long as that queue has more files to deliver. */ public class Delivery { - private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class); - - private static class DelItem implements Comparable { - private String pubid; - private String spool; - - public int compareTo(DelItem x) { - int i = pubid.compareTo(x.pubid); - if (i == 0) { - i = spool.compareTo(x.spool); - } - return (i); - } - - public String getPublishId() { - return (pubid); - } - - public String getSpool() { - return (spool); - } - - public DelItem(String pubid, String spool) { - this.pubid = pubid; - this.spool = spool; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DelItem delItem = (DelItem) o; - return Objects.equals(pubid, delItem.pubid) && - Objects.equals(getSpool(), delItem.getSpool()); - } - - @Override - public int hashCode() { - return Objects.hash(pubid, getSpool()); - } - } + private static final String TOTAL = " total="; + private static final String YELLOW = " yellow="; + private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class); private double fdstart; private double fdstop; private int threads; private int curthreads; private NodeConfigManager config; - private Hashtable dqs = new Hashtable(); + private Hashtable dqs = new Hashtable<>(); private DeliveryQueue[] queues = new DeliveryQueue[0]; private int qpos = 0; private long nextcheck; - private Runnable cmon = new Runnable() { - public void run() { - checkconfig(); - } - }; /** * Constructs a new Delivery system using the specified configuration manager. @@ -110,10 +62,37 @@ public class Delivery { */ public Delivery(NodeConfigManager config) { this.config = config; + Runnable cmon = this::checkconfig; config.registerConfigTask(cmon); checkconfig(); } + /** + * Reset the retry timer for a delivery queue. + */ + public synchronized void resetQueue(String spool) { + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + dq.resetQueue(); + } + } + } + + /** + * Mark the task in spool a success. + */ + public synchronized boolean markTaskSuccess(String spool, String pubId) { + boolean succeeded = false; + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + succeeded = dq.markTaskSuccess(pubId); + } + } + return succeeded; + } + private void cleardir(String dir) { if (dqs.get(dir) != null) { return; @@ -131,12 +110,11 @@ public class Delivery { File spoolfile = new File(config.getSpoolBase()); long tspace = spoolfile.getTotalSpace(); long start = (long) (tspace * fdstart); - long stop = (long) (tspace * fdstop); long cur = spoolfile.getUsableSpace(); if (cur >= start) { return; } - Vector cv = new Vector(); + ArrayList cv = new ArrayList<>(); for (String sdir : dqs.keySet()) { for (String meta : (new File(sdir)).list()) { if (!meta.endsWith(".M") || meta.charAt(0) == '.') { @@ -147,27 +125,21 @@ public class Delivery { } DelItem[] items = cv.toArray(new DelItem[cv.size()]); Arrays.sort(items); - logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace); - for (DelItem item : items) { - long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); - logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk"); - if (amount > 0) { - cur += amount; - if (cur >= stop) { - cur = spoolfile.getUsableSpace(); - } - if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); - return; - } - } + long stop = (long) (tspace * fdstop); + logger.info( + "NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + TOTAL + tspace); + if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) { + return; } cur = spoolfile.getUsableSpace(); if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); + logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop + + TOTAL + tspace); return; } - logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace); + logger.warn( + "NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + YELLOW + + stop + TOTAL + tspace); } private void cleardirs() { @@ -206,7 +178,7 @@ public class Delivery { DestInfo[] alldis = config.getAllDests(); DeliveryQueue[] nqs = new DeliveryQueue[alldis.length]; qpos = 0; - Hashtable ndqs = new Hashtable(); + Hashtable ndqs = new Hashtable<>(); for (DestInfo di : alldis) { String spl = di.getSpool(); DeliveryQueue dq = dqs.get(spl); @@ -223,11 +195,8 @@ public class Delivery { cleardirs(); while (curthreads < threads) { curthreads++; - (new Thread() { - { - setName("Delivery Thread"); - } - + (new Thread("Delivery Thread") { + @Override public void run() { dodelivery(); } @@ -276,29 +245,69 @@ public class Delivery { } } - /** - * Reset the retry timer for a delivery queue - */ - public synchronized void resetQueue(String spool) { - if (spool != null) { - DeliveryQueue dq = dqs.get(spool); - if (dq != null) { - dq.resetQueue(); + private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) { + for (DelItem item : items) { + long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); + logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + + " to free up disk"); + if (amount > 0) { + cur += amount; + if (cur >= stop) { + cur = spoolfile.getUsableSpace(); + } + if (cur >= stop) { + logger.info( + "NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop + + TOTAL + tspace); + return true; + } } } + return false; } - /** - * Mark the task in spool a success - */ - public synchronized boolean markTaskSuccess(String spool, String pubId) { - boolean succeeded = false; - if (spool != null) { - DeliveryQueue dq = dqs.get(spool); - if (dq != null) { - succeeded = dq.markTaskSuccess(pubId); + private static class DelItem implements Comparable { + + private String pubid; + private String spool; + + public DelItem(String pubid, String spool) { + this.pubid = pubid; + this.spool = spool; + } + + public int compareTo(DelItem other) { + int diff = pubid.compareTo(other.pubid); + if (diff == 0) { + diff = spool.compareTo(other.spool); } + return (diff); + } + + public String getPublishId() { + return (pubid); + } + + public String getSpool() { + return (spool); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + DelItem delItem = (DelItem) object; + return Objects.equals(pubid, delItem.pubid) + && Objects.equals(getSpool(), delItem.getSpool()); + } + + @Override + public int hashCode() { + return Objects.hash(pubid, getSpool()); } - return succeeded; } }