[DMAAP-48] Initial code import
[dmaap/datarouter.git] / datarouter-node / src / main / java / com / att / research / datarouter / node / Delivery.java
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java
new file mode 100644 (file)
index 0000000..d0e88ec
--- /dev/null
@@ -0,0 +1,253 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import java.util.*;\r
+import java.io.*;\r
+import org.apache.log4j.Logger;\r
+\r
+/**\r
+ *     Main control point for delivering files to destinations.\r
+ *     <p>\r
+ *     The Delivery class manages assignment of delivery threads to delivery\r
+ *     queues and creation and destruction of delivery queues as\r
+ *     configuration changes.  DeliveryQueues are assigned threads based on a\r
+ *     modified round-robin approach giving priority to queues with more work\r
+ *     as measured by both bytes to deliver and files to deliver and lower\r
+ *     priority to queues that already have delivery threads working.\r
+ *     A delivery thread continues to work for a delivery queue as long as\r
+ *     that queue has more files to deliver.\r
+ */\r
+public class Delivery {\r
+       private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.Delivery");\r
+       private static class DelItem implements Comparable<DelItem>     {\r
+               private String pubid;\r
+               private String spool;\r
+               public int compareTo(DelItem x) {\r
+                       int i = pubid.compareTo(x.pubid);\r
+                       if (i == 0) {\r
+                               i = spool.compareTo(x.spool);\r
+                       }\r
+                       return(i);\r
+               }\r
+               public String getPublishId() {\r
+                       return(pubid);\r
+               }\r
+               public String getSpool() {\r
+                       return(spool);\r
+               }\r
+               public DelItem(String pubid, String spool) {\r
+                       this.pubid = pubid;\r
+                       this.spool = spool;\r
+               }\r
+       }\r
+       private double  fdstart;\r
+       private double  fdstop;\r
+       private int     threads;\r
+       private int     curthreads;\r
+       private NodeConfigManager       config;\r
+       private Hashtable<String, DeliveryQueue>        dqs = new Hashtable<String, DeliveryQueue>();\r
+       private DeliveryQueue[] queues = new DeliveryQueue[0];\r
+       private int     qpos = 0;\r
+       private long    nextcheck;\r
+       private Runnable        cmon = new Runnable() {\r
+               public void run() {\r
+                       checkconfig();\r
+               }\r
+       };\r
+       /**\r
+        *      Constructs a new Delivery system using the specified configuration manager.\r
+        *      @param config   The configuration manager for this delivery system.\r
+        */\r
+       public Delivery(NodeConfigManager config) {\r
+               this.config = config;\r
+               config.registerConfigTask(cmon);\r
+               checkconfig();\r
+       }\r
+       private void cleardir(String dir) {\r
+               if (dqs.get(dir) != null) {\r
+                       return;\r
+               }\r
+               File fdir = new File(dir);\r
+               for (File junk: fdir.listFiles()) {\r
+                       if (junk.isFile()) {\r
+                               junk.delete();\r
+                       }\r
+               }\r
+               fdir.delete();\r
+       }\r
+       private void freeDiskCheck() {\r
+               File spoolfile = new File(config.getSpoolBase());\r
+               long tspace = spoolfile.getTotalSpace();\r
+               long start = (long)(tspace * fdstart);\r
+               long stop = (long)(tspace * fdstop);\r
+               long cur = spoolfile.getUsableSpace();\r
+               if (cur >= start) {\r
+                       return;\r
+               }\r
+               Vector<DelItem> cv = new Vector<DelItem>();\r
+               for (String sdir: dqs.keySet()) {\r
+                       for (String meta: (new File(sdir)).list()) {\r
+                               if (!meta.endsWith(".M") || meta.charAt(0) == '.') {\r
+                                       continue;\r
+                               }\r
+                               cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));\r
+                       }\r
+               }\r
+               DelItem[] items = cv.toArray(new DelItem[cv.size()]);\r
+               Arrays.sort(items);\r
+               logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);\r
+               for (DelItem item: items) {\r
+                       long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());\r
+                       logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");\r
+                       if (amount > 0) {\r
+                               cur += amount;\r
+                               if (cur >= stop) {\r
+                                       cur = spoolfile.getUsableSpace();\r
+                               }\r
+                               if (cur >= stop) {\r
+                                       logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
+                                       return;\r
+                               }\r
+                       }\r
+               }\r
+               cur = spoolfile.getUsableSpace();\r
+               if (cur >= stop) {\r
+                       logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
+                       return;\r
+               }\r
+               logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
+       }\r
+       private void cleardirs() {\r
+               String basedir = config.getSpoolBase();\r
+               String nbase = basedir + "/n";\r
+               for (String nodedir: (new File(nbase)).list()) {\r
+                       if (!nodedir.startsWith(".")) {\r
+                               cleardir(nbase + "/" + nodedir);\r
+                       }\r
+               }\r
+               String sxbase = basedir + "/s";\r
+               for (String sxdir: (new File(sxbase)).list()) {\r
+                       if (sxdir.startsWith(".")) {\r
+                               continue;\r
+                       }\r
+                       File sxf = new File(sxbase + "/" + sxdir);\r
+                       for (String sdir: sxf.list()) {\r
+                               if (!sdir.startsWith(".")) {\r
+                                       cleardir(sxbase + "/" + sxdir + "/" + sdir);\r
+                               }\r
+                       }\r
+                       sxf.delete();  // won't if anything still in it\r
+               }\r
+       }\r
+       private synchronized void checkconfig() {\r
+               if (!config.isConfigured()) {\r
+                       return;\r
+               }\r
+               fdstart = config.getFreeDiskStart();\r
+               fdstop = config.getFreeDiskStop();\r
+               threads = config.getDeliveryThreads();\r
+               if (threads < 1) {\r
+                       threads = 1;\r
+               }\r
+               DestInfo[] alldis = config.getAllDests();\r
+               DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];\r
+               qpos = 0;\r
+               Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();\r
+               for (DestInfo di: alldis) {\r
+                       String spl = di.getSpool();\r
+                       DeliveryQueue dq = dqs.get(spl);\r
+                       if (dq == null) {\r
+                               dq = new DeliveryQueue(config, di);\r
+                       } else {\r
+                               dq.config(di);\r
+                       }\r
+                       ndqs.put(spl, dq);\r
+                       nqs[qpos++] = dq;\r
+               }\r
+               queues = nqs;\r
+               dqs = ndqs;\r
+               cleardirs();\r
+               while (curthreads < threads) {\r
+                       curthreads++;\r
+                       (new Thread() {\r
+                               {\r
+                                       setName("Delivery Thread");\r
+                               }\r
+                               public void run() {\r
+                                       dodelivery();\r
+                               }\r
+                       }).start();\r
+               }\r
+               nextcheck = 0;\r
+               notify();\r
+       }\r
+       private void dodelivery() {\r
+               DeliveryQueue dq;\r
+               while ((dq = getNextQueue()) != null) {\r
+                       dq.run();\r
+               }\r
+       }\r
+       private synchronized DeliveryQueue getNextQueue() {\r
+               while (true) {\r
+                       if (curthreads > threads) {\r
+                               curthreads--;\r
+                               return(null);\r
+                       }\r
+                       if (qpos < queues.length) {\r
+                               DeliveryQueue dq = queues[qpos++];\r
+                               if (dq.isSkipSet()) {\r
+                                       continue;\r
+                               }\r
+                               nextcheck = 0;\r
+                               notify();\r
+                               return(dq);\r
+                       }\r
+                       long now = System.currentTimeMillis();\r
+                       if (now < nextcheck) {\r
+                               try {\r
+                                       wait(nextcheck + 500 - now);\r
+                               } catch (Exception e) {\r
+                               }\r
+                               now = System.currentTimeMillis();\r
+                       }\r
+                       if (now >= nextcheck) {\r
+                               nextcheck = now + 5000;\r
+                               qpos = 0;\r
+                               freeDiskCheck();\r
+                       }\r
+               }\r
+       }\r
+       /**\r
+        *      Reset the retry timer for a delivery queue\r
+        */\r
+       public synchronized void resetQueue(String spool) {\r
+               if (spool != null) {\r
+                       DeliveryQueue dq = dqs.get(spool);\r
+                       if (dq != null) {\r
+                               dq.resetQueue();\r
+                       }\r
+               }\r
+       }\r
+}\r