package org.onap.dmaap.datarouter.node;
-import java.util.*;
-import java.io.*;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
/**
* Main control point for delivering files to destinations.
- * <p>
- * The Delivery class manages assignment of delivery threads to delivery
- * queues and creation and destruction of delivery queues as
- * configuration changes. DeliveryQueues are assigned threads based on a
- * modified round-robin approach giving priority to queues with more work
- * as measured by both bytes to deliver and files to deliver and lower
- * priority to queues that already have delivery threads working.
- * A delivery thread continues to work for a delivery queue as long as
- * that queue has more files to deliver.
+ *
+ * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
+ * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
+ * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
+ * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
+ * queue as long as that queue has more files to deliver.
*/
public class Delivery {
- private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
-
- private static class DelItem implements Comparable<DelItem> {
- private String pubid;
- private String spool;
-
- public int compareTo(DelItem x) {
- int i = pubid.compareTo(x.pubid);
- if (i == 0) {
- i = spool.compareTo(x.spool);
- }
- return (i);
- }
-
- public String getPublishId() {
- return (pubid);
- }
-
- public String getSpool() {
- return (spool);
- }
-
- public DelItem(String pubid, String spool) {
- this.pubid = pubid;
- this.spool = spool;
- }
- }
+ private static final String TOTAL = " total=";
+ private static final String YELLOW = " yellow=";
+ private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
private double fdstart;
private double fdstop;
private int threads;
private int curthreads;
private NodeConfigManager config;
- private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
+ private HashMap<String, DeliveryQueue> dqs = new HashMap<>();
private DeliveryQueue[] queues = new DeliveryQueue[0];
private int qpos = 0;
private long nextcheck;
- private Runnable cmon = new Runnable() {
- public void run() {
- checkconfig();
- }
- };
/**
* Constructs a new Delivery system using the specified configuration manager.
*/
public Delivery(NodeConfigManager config) {
this.config = config;
+ Runnable cmon = this::checkconfig;
config.registerConfigTask(cmon);
checkconfig();
}
+ /**
+ * Reset the retry timer for a delivery queue.
+ */
+ public synchronized void resetQueue(String spool) {
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ dq.resetQueue();
+ }
+ }
+ }
+
+ /**
+ * Mark the task in spool a success.
+ */
+ public synchronized boolean markTaskSuccess(String spool, String pubId) {
+ boolean succeeded = false;
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ succeeded = dq.markTaskSuccess(pubId);
+ }
+ }
+ return succeeded;
+ }
+
private void cleardir(String dir) {
if (dqs.get(dir) != null) {
return;
File spoolfile = new File(config.getSpoolBase());
long tspace = spoolfile.getTotalSpace();
long start = (long) (tspace * fdstart);
- long stop = (long) (tspace * fdstop);
long cur = spoolfile.getUsableSpace();
if (cur >= start) {
return;
}
- Vector<DelItem> cv = new Vector<DelItem>();
+ ArrayList<DelItem> cv = new ArrayList<>();
for (String sdir : dqs.keySet()) {
for (String meta : (new File(sdir)).list()) {
if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
}
DelItem[] items = cv.toArray(new DelItem[cv.size()]);
Arrays.sort(items);
- logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace);
- for (DelItem item : items) {
- long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
- logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
- if (amount > 0) {
- cur += amount;
- if (cur >= stop) {
- cur = spoolfile.getUsableSpace();
- }
- if (cur >= stop) {
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
- return;
- }
- }
+ long stop = (long) (tspace * fdstop);
+ logger.info(
+ "NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + TOTAL + tspace);
+ if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
+ return;
}
cur = spoolfile.getUsableSpace();
if (cur >= stop) {
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
+ logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
return;
}
- logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace);
+ logger.warn(
+ "NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + YELLOW
+ + stop + TOTAL + tspace);
}
private void cleardirs() {
DestInfo[] alldis = config.getAllDests();
DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
qpos = 0;
- Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
+ HashMap<String, DeliveryQueue> ndqs = new HashMap<>();
for (DestInfo di : alldis) {
String spl = di.getSpool();
DeliveryQueue dq = dqs.get(spl);
cleardirs();
while (curthreads < threads) {
curthreads++;
- (new Thread() {
- {
- setName("Delivery Thread");
- }
-
+ (new Thread("del-thread-" + curthreads) {
+ @Override
public void run() {
dodelivery();
}
try {
wait(nextcheck + 500 - now);
} catch (Exception e) {
+ logger.error("InterruptedException", e);
}
now = System.currentTimeMillis();
}
}
}
- /**
- * Reset the retry timer for a delivery queue
- */
- public synchronized void resetQueue(String spool) {
- if (spool != null) {
- DeliveryQueue dq = dqs.get(spool);
- if (dq != null) {
- dq.resetQueue();
+ private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
+ for (DelItem item : items) {
+ long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
+ logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
+ + " to free up disk");
+ if (amount > 0) {
+ cur += amount;
+ if (cur >= stop) {
+ cur = spoolfile.getUsableSpace();
+ }
+ if (cur >= stop) {
+ logger.info(
+ "NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
+ return true;
+ }
}
}
+ return false;
}
- /**
- * Mark the task in spool a success
- */
- public synchronized boolean markTaskSuccess(String spool, String pubId) {
- boolean succeeded = false;
- if (spool != null) {
- DeliveryQueue dq = dqs.get(spool);
- if (dq != null) {
- succeeded = dq.markTaskSuccess(pubId);
+ static class DelItem implements Comparable<DelItem> {
+
+ private String pubid;
+ private String spool;
+
+ public DelItem(String pubid, String spool) {
+ this.pubid = pubid;
+ this.spool = spool;
+ }
+
+ public int compareTo(DelItem other) {
+ int diff = pubid.compareTo(other.pubid);
+ if (diff == 0) {
+ diff = spool.compareTo(other.spool);
}
+ return (diff);
+ }
+
+ public String getPublishId() {
+ return (pubid);
+ }
+
+ public String getSpool() {
+ return (spool);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ DelItem delItem = (DelItem) object;
+ return Objects.equals(pubid, delItem.pubid)
+ && Objects.equals(getSpool(), delItem.getSpool());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pubid, getSpool());
}
- return succeeded;
}
}