-/*******************************************************************************\r
- * ============LICENSE_START==================================================\r
- * * org.onap.dmaap\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * * \r
- * * http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
- * * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-\r
-package org.onap.dmaap.datarouter.node;\r
-\r
-import java.util.*;\r
-import java.io.*;\r
-import org.apache.log4j.Logger;\r
-\r
-/**\r
- * Main control point for delivering files to destinations.\r
- * <p>\r
- * The Delivery class manages assignment of delivery threads to delivery\r
- * queues and creation and destruction of delivery queues as\r
- * configuration changes. DeliveryQueues are assigned threads based on a\r
- * modified round-robin approach giving priority to queues with more work\r
- * as measured by both bytes to deliver and files to deliver and lower\r
- * priority to queues that already have delivery threads working.\r
- * A delivery thread continues to work for a delivery queue as long as\r
- * that queue has more files to deliver.\r
- */\r
-public class Delivery {\r
- private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.Delivery");\r
- private static class DelItem implements Comparable<DelItem> {\r
- private String pubid;\r
- private String spool;\r
- public int compareTo(DelItem x) {\r
- int i = pubid.compareTo(x.pubid);\r
- if (i == 0) {\r
- i = spool.compareTo(x.spool);\r
- }\r
- return(i);\r
- }\r
- public String getPublishId() {\r
- return(pubid);\r
- }\r
- public String getSpool() {\r
- return(spool);\r
- }\r
- public DelItem(String pubid, String spool) {\r
- this.pubid = pubid;\r
- this.spool = spool;\r
- }\r
- }\r
- private double fdstart;\r
- private double fdstop;\r
- private int threads;\r
- private int curthreads;\r
- private NodeConfigManager config;\r
- private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();\r
- private DeliveryQueue[] queues = new DeliveryQueue[0];\r
- private int qpos = 0;\r
- private long nextcheck;\r
- private Runnable cmon = new Runnable() {\r
- public void run() {\r
- checkconfig();\r
- }\r
- };\r
- /**\r
- * Constructs a new Delivery system using the specified configuration manager.\r
- * @param config The configuration manager for this delivery system.\r
- */\r
- public Delivery(NodeConfigManager config) {\r
- this.config = config;\r
- config.registerConfigTask(cmon);\r
- checkconfig();\r
- }\r
- private void cleardir(String dir) {\r
- if (dqs.get(dir) != null) {\r
- return;\r
- }\r
- File fdir = new File(dir);\r
- for (File junk: fdir.listFiles()) {\r
- if (junk.isFile()) {\r
- junk.delete();\r
- }\r
- }\r
- fdir.delete();\r
- }\r
- private void freeDiskCheck() {\r
- File spoolfile = new File(config.getSpoolBase());\r
- long tspace = spoolfile.getTotalSpace();\r
- long start = (long)(tspace * fdstart);\r
- long stop = (long)(tspace * fdstop);\r
- long cur = spoolfile.getUsableSpace();\r
- if (cur >= start) {\r
- return;\r
- }\r
- Vector<DelItem> cv = new Vector<DelItem>();\r
- for (String sdir: dqs.keySet()) {\r
- for (String meta: (new File(sdir)).list()) {\r
- if (!meta.endsWith(".M") || meta.charAt(0) == '.') {\r
- continue;\r
- }\r
- cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));\r
- }\r
- }\r
- DelItem[] items = cv.toArray(new DelItem[cv.size()]);\r
- Arrays.sort(items);\r
- logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace);\r
- for (DelItem item: items) {\r
- long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());\r
- logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");\r
- if (amount > 0) {\r
- cur += amount;\r
- if (cur >= stop) {\r
- cur = spoolfile.getUsableSpace();\r
- }\r
- if (cur >= stop) {\r
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);\r
- return;\r
- }\r
- }\r
- }\r
- cur = spoolfile.getUsableSpace();\r
- if (cur >= stop) {\r
- logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);\r
- return;\r
- }\r
- logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace);\r
- }\r
- private void cleardirs() {\r
- String basedir = config.getSpoolBase();\r
- String nbase = basedir + "/n";\r
- for (String nodedir: (new File(nbase)).list()) {\r
- if (!nodedir.startsWith(".")) {\r
- cleardir(nbase + "/" + nodedir);\r
- }\r
- }\r
- String sxbase = basedir + "/s";\r
- for (String sxdir: (new File(sxbase)).list()) {\r
- if (sxdir.startsWith(".")) {\r
- continue;\r
- }\r
- File sxf = new File(sxbase + "/" + sxdir);\r
- for (String sdir: sxf.list()) {\r
- if (!sdir.startsWith(".")) {\r
- cleardir(sxbase + "/" + sxdir + "/" + sdir);\r
- }\r
- }\r
- sxf.delete(); // won't if anything still in it\r
- }\r
- }\r
- private synchronized void checkconfig() {\r
- if (!config.isConfigured()) {\r
- return;\r
- }\r
- fdstart = config.getFreeDiskStart();\r
- fdstop = config.getFreeDiskStop();\r
- threads = config.getDeliveryThreads();\r
- if (threads < 1) {\r
- threads = 1;\r
- }\r
- DestInfo[] alldis = config.getAllDests();\r
- DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];\r
- qpos = 0;\r
- Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();\r
- for (DestInfo di: alldis) {\r
- String spl = di.getSpool();\r
- DeliveryQueue dq = dqs.get(spl);\r
- if (dq == null) {\r
- dq = new DeliveryQueue(config, di);\r
- } else {\r
- dq.config(di);\r
- }\r
- ndqs.put(spl, dq);\r
- nqs[qpos++] = dq;\r
- }\r
- queues = nqs;\r
- dqs = ndqs;\r
- cleardirs();\r
- while (curthreads < threads) {\r
- curthreads++;\r
- (new Thread() {\r
- {\r
- setName("Delivery Thread");\r
- }\r
- public void run() {\r
- dodelivery();\r
- }\r
- }).start();\r
- }\r
- nextcheck = 0;\r
- notify();\r
- }\r
- private void dodelivery() {\r
- DeliveryQueue dq;\r
- while ((dq = getNextQueue()) != null) {\r
- dq.run();\r
- }\r
- }\r
- private synchronized DeliveryQueue getNextQueue() {\r
- while (true) {\r
- if (curthreads > threads) {\r
- curthreads--;\r
- return(null);\r
- }\r
- if (qpos < queues.length) {\r
- DeliveryQueue dq = queues[qpos++];\r
- if (dq.isSkipSet()) {\r
- continue;\r
- }\r
- nextcheck = 0;\r
- notify();\r
- return(dq);\r
- }\r
- long now = System.currentTimeMillis();\r
- if (now < nextcheck) {\r
- try {\r
- wait(nextcheck + 500 - now);\r
- } catch (Exception e) {\r
- }\r
- now = System.currentTimeMillis();\r
- }\r
- if (now >= nextcheck) {\r
- nextcheck = now + 5000;\r
- qpos = 0;\r
- freeDiskCheck();\r
- }\r
- }\r
- }\r
- /**\r
- * Reset the retry timer for a delivery queue\r
- */\r
- public synchronized void resetQueue(String spool) {\r
- if (spool != null) {\r
- DeliveryQueue dq = dqs.get(spool);\r
- if (dq != null) {\r
- dq.resetQueue();\r
- }\r
- }\r
- }\r
-}\r
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+package org.onap.dmaap.datarouter.node;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+/**
+ * Main control point for delivering files to destinations.
+ *
+ * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
+ * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
+ * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
+ * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
+ * queue as long as that queue has more files to deliver.
+ */
+public class Delivery {
+
+ private static final String TOTAL = " total=";
+ private static final String YELLOW = " yellow=";
+ private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
+ private double fdstart;
+ private double fdstop;
+ private int threads;
+ private int curthreads;
+ private NodeConfigManager config;
+ private HashMap<String, DeliveryQueue> dqs = new HashMap<>();
+ private DeliveryQueue[] queues = new DeliveryQueue[0];
+ private int qpos = 0;
+ private long nextcheck;
+
+ /**
+ * Constructs a new Delivery system using the specified configuration manager.
+ *
+ * @param config The configuration manager for this delivery system.
+ */
+ public Delivery(NodeConfigManager config) {
+ this.config = config;
+ Runnable cmon = this::checkconfig;
+ config.registerConfigTask(cmon);
+ checkconfig();
+ }
+
+ /**
+ * Reset the retry timer for a delivery queue.
+ */
+ public synchronized void resetQueue(String spool) {
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ dq.resetQueue();
+ }
+ }
+ }
+
+ /**
+ * Mark the task in spool a success.
+ */
+ public synchronized boolean markTaskSuccess(String spool, String pubId) {
+ boolean succeeded = false;
+ if (spool != null) {
+ DeliveryQueue dq = dqs.get(spool);
+ if (dq != null) {
+ succeeded = dq.markTaskSuccess(pubId);
+ }
+ }
+ return succeeded;
+ }
+
+ private void cleardir(String dir) {
+ if (dqs.get(dir) != null) {
+ return;
+ }
+ File fdir = new File(dir);
+ for (File junk : fdir.listFiles()) {
+ if (junk.isFile()) {
+ junk.delete();
+ }
+ }
+ fdir.delete();
+ }
+
+ private void freeDiskCheck() {
+ File spoolfile = new File(config.getSpoolBase());
+ long tspace = spoolfile.getTotalSpace();
+ long start = (long) (tspace * fdstart);
+ long cur = spoolfile.getUsableSpace();
+ if (cur >= start) {
+ return;
+ }
+ ArrayList<DelItem> cv = new ArrayList<>();
+ for (String sdir : dqs.keySet()) {
+ for (String meta : (new File(sdir)).list()) {
+ if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
+ continue;
+ }
+ cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
+ }
+ }
+ DelItem[] items = cv.toArray(new DelItem[cv.size()]);
+ Arrays.sort(items);
+ long stop = (long) (tspace * fdstop);
+ logger.info(
+ "NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + TOTAL + tspace);
+ if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
+ return;
+ }
+ cur = spoolfile.getUsableSpace();
+ if (cur >= stop) {
+ logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
+ return;
+ }
+ logger.warn(
+ "NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + YELLOW
+ + stop + TOTAL + tspace);
+ }
+
+ private void cleardirs() {
+ String basedir = config.getSpoolBase();
+ String nbase = basedir + "/n";
+ for (String nodedir : (new File(nbase)).list()) {
+ if (!nodedir.startsWith(".")) {
+ cleardir(nbase + "/" + nodedir);
+ }
+ }
+ String sxbase = basedir + "/s";
+ for (String sxdir : (new File(sxbase)).list()) {
+ if (sxdir.startsWith(".")) {
+ continue;
+ }
+ File sxf = new File(sxbase + "/" + sxdir);
+ for (String sdir : sxf.list()) {
+ if (!sdir.startsWith(".")) {
+ cleardir(sxbase + "/" + sxdir + "/" + sdir);
+ }
+ }
+ sxf.delete(); // won't if anything still in it
+ }
+ }
+
+ private synchronized void checkconfig() {
+ if (!config.isConfigured()) {
+ return;
+ }
+ fdstart = config.getFreeDiskStart();
+ fdstop = config.getFreeDiskStop();
+ threads = config.getDeliveryThreads();
+ if (threads < 1) {
+ threads = 1;
+ }
+ DestInfo[] alldis = config.getAllDests();
+ DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
+ qpos = 0;
+ HashMap<String, DeliveryQueue> ndqs = new HashMap<>();
+ for (DestInfo di : alldis) {
+ String spl = di.getSpool();
+ DeliveryQueue dq = dqs.get(spl);
+ if (dq == null) {
+ dq = new DeliveryQueue(config, di);
+ } else {
+ dq.config(di);
+ }
+ ndqs.put(spl, dq);
+ nqs[qpos++] = dq;
+ }
+ queues = nqs;
+ dqs = ndqs;
+ cleardirs();
+ while (curthreads < threads) {
+ curthreads++;
+ (new Thread("del-thread-" + curthreads) {
+ @Override
+ public void run() {
+ dodelivery();
+ }
+ }).start();
+ }
+ nextcheck = 0;
+ notify();
+ }
+
+ private void dodelivery() {
+ DeliveryQueue dq;
+ while ((dq = getNextQueue()) != null) {
+ dq.run();
+ }
+ }
+
+ private synchronized DeliveryQueue getNextQueue() {
+ while (true) {
+ if (curthreads > threads) {
+ curthreads--;
+ return (null);
+ }
+ if (qpos < queues.length) {
+ DeliveryQueue dq = queues[qpos++];
+ if (dq.isSkipSet()) {
+ continue;
+ }
+ nextcheck = 0;
+ notify();
+ return (dq);
+ }
+ long now = System.currentTimeMillis();
+ if (now < nextcheck) {
+ try {
+ wait(nextcheck + 500 - now);
+ } catch (Exception e) {
+ logger.error("InterruptedException", e);
+ }
+ now = System.currentTimeMillis();
+ }
+ if (now >= nextcheck) {
+ nextcheck = now + 5000;
+ qpos = 0;
+ freeDiskCheck();
+ }
+ }
+ }
+
+ private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
+ for (DelItem item : items) {
+ long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
+ logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
+ + " to free up disk");
+ if (amount > 0) {
+ cur += amount;
+ if (cur >= stop) {
+ cur = spoolfile.getUsableSpace();
+ }
+ if (cur >= stop) {
+ logger.info(
+ "NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
+ + TOTAL + tspace);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ static class DelItem implements Comparable<DelItem> {
+
+ private String pubid;
+ private String spool;
+
+ public DelItem(String pubid, String spool) {
+ this.pubid = pubid;
+ this.spool = spool;
+ }
+
+ public int compareTo(DelItem other) {
+ int diff = pubid.compareTo(other.pubid);
+ if (diff == 0) {
+ diff = spool.compareTo(other.spool);
+ }
+ return (diff);
+ }
+
+ public String getPublishId() {
+ return (pubid);
+ }
+
+ public String getSpool() {
+ return (spool);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ DelItem delItem = (DelItem) object;
+ return Objects.equals(pubid, delItem.pubid)
+ && Objects.equals(getSpool(), delItem.getSpool());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pubid, getSpool());
+ }
+ }
+}