Remove major and minor code smells in dr-node
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / Delivery.java
index 4494024..df73c1e 100644 (file)
-/*******************************************************************************\r
- * ============LICENSE_START==================================================\r
- * * org.onap.dmaap\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * * \r
- *  *      http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
- *  * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-\r
-package org.onap.dmaap.datarouter.node;\r
-\r
-import java.util.*;\r
-import java.io.*;\r
-import org.apache.log4j.Logger;\r
-\r
-/**\r
- *     Main control point for delivering files to destinations.\r
- *     <p>\r
- *     The Delivery class manages assignment of delivery threads to delivery\r
- *     queues and creation and destruction of delivery queues as\r
- *     configuration changes.  DeliveryQueues are assigned threads based on a\r
- *     modified round-robin approach giving priority to queues with more work\r
- *     as measured by both bytes to deliver and files to deliver and lower\r
- *     priority to queues that already have delivery threads working.\r
- *     A delivery thread continues to work for a delivery queue as long as\r
- *     that queue has more files to deliver.\r
- */\r
-public class Delivery {\r
-       private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.Delivery");\r
-       private static class DelItem implements Comparable<DelItem>     {\r
-               private String pubid;\r
-               private String spool;\r
-               public int compareTo(DelItem x) {\r
-                       int i = pubid.compareTo(x.pubid);\r
-                       if (i == 0) {\r
-                               i = spool.compareTo(x.spool);\r
-                       }\r
-                       return(i);\r
-               }\r
-               public String getPublishId() {\r
-                       return(pubid);\r
-               }\r
-               public String getSpool() {\r
-                       return(spool);\r
-               }\r
-               public DelItem(String pubid, String spool) {\r
-                       this.pubid = pubid;\r
-                       this.spool = spool;\r
-               }\r
-       }\r
-       private double  fdstart;\r
-       private double  fdstop;\r
-       private int     threads;\r
-       private int     curthreads;\r
-       private NodeConfigManager       config;\r
-       private Hashtable<String, DeliveryQueue>        dqs = new Hashtable<String, DeliveryQueue>();\r
-       private DeliveryQueue[] queues = new DeliveryQueue[0];\r
-       private int     qpos = 0;\r
-       private long    nextcheck;\r
-       private Runnable        cmon = new Runnable() {\r
-               public void run() {\r
-                       checkconfig();\r
-               }\r
-       };\r
-       /**\r
-        *      Constructs a new Delivery system using the specified configuration manager.\r
-        *      @param config   The configuration manager for this delivery system.\r
-        */\r
-       public Delivery(NodeConfigManager config) {\r
-               this.config = config;\r
-               config.registerConfigTask(cmon);\r
-               checkconfig();\r
-       }\r
-       private void cleardir(String dir) {\r
-               if (dqs.get(dir) != null) {\r
-                       return;\r
-               }\r
-               File fdir = new File(dir);\r
-               for (File junk: fdir.listFiles()) {\r
-                       if (junk.isFile()) {\r
-                               junk.delete();\r
-                       }\r
-               }\r
-               fdir.delete();\r
-       }\r
-       private void freeDiskCheck() {\r
-               File spoolfile = new File(config.getSpoolBase());\r
-               long tspace = spoolfile.getTotalSpace();\r
-               long start = (long)(tspace * fdstart);\r
-               long stop = (long)(tspace * fdstop);\r
-               long cur = spoolfile.getUsableSpace();\r
-               if (cur >= start) {\r
-                       return;\r
-               }\r
-               Vector<DelItem> cv = new Vector<DelItem>();\r
-               for (String sdir: dqs.keySet()) {\r
-                       for (String meta: (new File(sdir)).list()) {\r
-                               if (!meta.endsWith(".M") || meta.charAt(0) == '.') {\r
-                                       continue;\r
-                               }\r
-                               cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));\r
-                       }\r
-               }\r
-               DelItem[] items = cv.toArray(new DelItem[cv.size()]);\r
-               Arrays.sort(items);\r
-               logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);\r
-               for (DelItem item: items) {\r
-                       long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());\r
-                       logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");\r
-                       if (amount > 0) {\r
-                               cur += amount;\r
-                               if (cur >= stop) {\r
-                                       cur = spoolfile.getUsableSpace();\r
-                               }\r
-                               if (cur >= stop) {\r
-                                       logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
-                                       return;\r
-                               }\r
-                       }\r
-               }\r
-               cur = spoolfile.getUsableSpace();\r
-               if (cur >= stop) {\r
-                       logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
-                       return;\r
-               }\r
-               logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
-       }\r
-       private void cleardirs() {\r
-               String basedir = config.getSpoolBase();\r
-               String nbase = basedir + "/n";\r
-               for (String nodedir: (new File(nbase)).list()) {\r
-                       if (!nodedir.startsWith(".")) {\r
-                               cleardir(nbase + "/" + nodedir);\r
-                       }\r
-               }\r
-               String sxbase = basedir + "/s";\r
-               for (String sxdir: (new File(sxbase)).list()) {\r
-                       if (sxdir.startsWith(".")) {\r
-                               continue;\r
-                       }\r
-                       File sxf = new File(sxbase + "/" + sxdir);\r
-                       for (String sdir: sxf.list()) {\r
-                               if (!sdir.startsWith(".")) {\r
-                                       cleardir(sxbase + "/" + sxdir + "/" + sdir);\r
-                               }\r
-                       }\r
-                       sxf.delete();  // won't if anything still in it\r
-               }\r
-       }\r
-       private synchronized void checkconfig() {\r
-               if (!config.isConfigured()) {\r
-                       return;\r
-               }\r
-               fdstart = config.getFreeDiskStart();\r
-               fdstop = config.getFreeDiskStop();\r
-               threads = config.getDeliveryThreads();\r
-               if (threads < 1) {\r
-                       threads = 1;\r
-               }\r
-               DestInfo[] alldis = config.getAllDests();\r
-               DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];\r
-               qpos = 0;\r
-               Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();\r
-               for (DestInfo di: alldis) {\r
-                       String spl = di.getSpool();\r
-                       DeliveryQueue dq = dqs.get(spl);\r
-                       if (dq == null) {\r
-                               dq = new DeliveryQueue(config, di);\r
-                       } else {\r
-                               dq.config(di);\r
-                       }\r
-                       ndqs.put(spl, dq);\r
-                       nqs[qpos++] = dq;\r
-               }\r
-               queues = nqs;\r
-               dqs = ndqs;\r
-               cleardirs();\r
-               while (curthreads < threads) {\r
-                       curthreads++;\r
-                       (new Thread() {\r
-                               {\r
-                                       setName("Delivery Thread");\r
-                               }\r
-                               public void run() {\r
-                                       dodelivery();\r
-                               }\r
-                       }).start();\r
-               }\r
-               nextcheck = 0;\r
-               notify();\r
-       }\r
-       private void dodelivery() {\r
-               DeliveryQueue dq;\r
-               while ((dq = getNextQueue()) != null) {\r
-                       dq.run();\r
-               }\r
-       }\r
-       private synchronized DeliveryQueue getNextQueue() {\r
-               while (true) {\r
-                       if (curthreads > threads) {\r
-                               curthreads--;\r
-                               return(null);\r
-                       }\r
-                       if (qpos < queues.length) {\r
-                               DeliveryQueue dq = queues[qpos++];\r
-                               if (dq.isSkipSet()) {\r
-                                       continue;\r
-                               }\r
-                               nextcheck = 0;\r
-                               notify();\r
-                               return(dq);\r
-                       }\r
-                       long now = System.currentTimeMillis();\r
-                       if (now < nextcheck) {\r
-                               try {\r
-                                       wait(nextcheck + 500 - now);\r
-                               } catch (Exception e) {\r
-                               }\r
-                               now = System.currentTimeMillis();\r
-                       }\r
-                       if (now >= nextcheck) {\r
-                               nextcheck = now + 5000;\r
-                               qpos = 0;\r
-                               freeDiskCheck();\r
-                       }\r
-               }\r
-       }\r
-       /**\r
-        *      Reset the retry timer for a delivery queue\r
-        */\r
-       public synchronized void resetQueue(String spool) {\r
-               if (spool != null) {\r
-                       DeliveryQueue dq = dqs.get(spool);\r
-                       if (dq != null) {\r
-                               dq.resetQueue();\r
-                       }\r
-               }\r
-       }\r
-}\r
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+package org.onap.dmaap.datarouter.node;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Objects;
+
+/**
+ * Main control point for delivering files to destinations.
+ *
+ * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
+ * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
+ * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
+ * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
+ * queue as long as that queue has more files to deliver.
+ */
+public class Delivery {
+
+    private static final String TOTAL = " total=";
+    private static final String YELLOW = " yellow=";
+    private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
+    private double fdstart;
+    private double fdstop;
+    private int threads;
+    private int curthreads;
+    private NodeConfigManager config;
+    private Hashtable<String, DeliveryQueue> dqs = new Hashtable<>();
+    private DeliveryQueue[] queues = new DeliveryQueue[0];
+    private int qpos = 0;
+    private long nextcheck;
+
+    /**
+     * Constructs a new Delivery system using the specified configuration manager.
+     *
+     * @param config The configuration manager for this delivery system.
+     */
+    public Delivery(NodeConfigManager config) {
+        this.config = config;
+        Runnable cmon = this::checkconfig;
+        config.registerConfigTask(cmon);
+        checkconfig();
+    }
+
+    /**
+     * Reset the retry timer for a delivery queue.
+     */
+    public synchronized void resetQueue(String spool) {
+        if (spool != null) {
+            DeliveryQueue dq = dqs.get(spool);
+            if (dq != null) {
+                dq.resetQueue();
+            }
+        }
+    }
+
+    /**
+     * Mark the task in spool a success.
+     */
+    public synchronized boolean markTaskSuccess(String spool, String pubId) {
+        boolean succeeded = false;
+        if (spool != null) {
+            DeliveryQueue dq = dqs.get(spool);
+            if (dq != null) {
+                succeeded = dq.markTaskSuccess(pubId);
+            }
+        }
+        return succeeded;
+    }
+
+    private void cleardir(String dir) {
+        if (dqs.get(dir) != null) {
+            return;
+        }
+        File fdir = new File(dir);
+        for (File junk : fdir.listFiles()) {
+            if (junk.isFile()) {
+                junk.delete();
+            }
+        }
+        fdir.delete();
+    }
+
+    private void freeDiskCheck() {
+        File spoolfile = new File(config.getSpoolBase());
+        long tspace = spoolfile.getTotalSpace();
+        long start = (long) (tspace * fdstart);
+        long cur = spoolfile.getUsableSpace();
+        if (cur >= start) {
+            return;
+        }
+        ArrayList<DelItem> cv = new ArrayList<>();
+        for (String sdir : dqs.keySet()) {
+            for (String meta : (new File(sdir)).list()) {
+                if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
+                    continue;
+                }
+                cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
+            }
+        }
+        DelItem[] items = cv.toArray(new DelItem[cv.size()]);
+        Arrays.sort(items);
+        long stop = (long) (tspace * fdstop);
+        logger.info(
+                "NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + TOTAL + tspace);
+        if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
+            return;
+        }
+        cur = spoolfile.getUsableSpace();
+        if (cur >= stop) {
+            logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
+                    + TOTAL + tspace);
+            return;
+        }
+        logger.warn(
+                "NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + YELLOW
+                        + stop + TOTAL + tspace);
+    }
+
+    private void cleardirs() {
+        String basedir = config.getSpoolBase();
+        String nbase = basedir + "/n";
+        for (String nodedir : (new File(nbase)).list()) {
+            if (!nodedir.startsWith(".")) {
+                cleardir(nbase + "/" + nodedir);
+            }
+        }
+        String sxbase = basedir + "/s";
+        for (String sxdir : (new File(sxbase)).list()) {
+            if (sxdir.startsWith(".")) {
+                continue;
+            }
+            File sxf = new File(sxbase + "/" + sxdir);
+            for (String sdir : sxf.list()) {
+                if (!sdir.startsWith(".")) {
+                    cleardir(sxbase + "/" + sxdir + "/" + sdir);
+                }
+            }
+            sxf.delete();  // won't if anything still in it
+        }
+    }
+
+    private synchronized void checkconfig() {
+        if (!config.isConfigured()) {
+            return;
+        }
+        fdstart = config.getFreeDiskStart();
+        fdstop = config.getFreeDiskStop();
+        threads = config.getDeliveryThreads();
+        if (threads < 1) {
+            threads = 1;
+        }
+        DestInfo[] alldis = config.getAllDests();
+        DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
+        qpos = 0;
+        Hashtable<String, DeliveryQueue> ndqs = new Hashtable<>();
+        for (DestInfo di : alldis) {
+            String spl = di.getSpool();
+            DeliveryQueue dq = dqs.get(spl);
+            if (dq == null) {
+                dq = new DeliveryQueue(config, di);
+            } else {
+                dq.config(di);
+            }
+            ndqs.put(spl, dq);
+            nqs[qpos++] = dq;
+        }
+        queues = nqs;
+        dqs = ndqs;
+        cleardirs();
+        while (curthreads < threads) {
+            curthreads++;
+            (new Thread("Delivery Thread") {
+                @Override
+                public void run() {
+                    dodelivery();
+                }
+            }).start();
+        }
+        nextcheck = 0;
+        notify();
+    }
+
+    private void dodelivery() {
+        DeliveryQueue dq;
+        while ((dq = getNextQueue()) != null) {
+            dq.run();
+        }
+    }
+
+    private synchronized DeliveryQueue getNextQueue() {
+        while (true) {
+            if (curthreads > threads) {
+                curthreads--;
+                return (null);
+            }
+            if (qpos < queues.length) {
+                DeliveryQueue dq = queues[qpos++];
+                if (dq.isSkipSet()) {
+                    continue;
+                }
+                nextcheck = 0;
+                notify();
+                return (dq);
+            }
+            long now = System.currentTimeMillis();
+            if (now < nextcheck) {
+                try {
+                    wait(nextcheck + 500 - now);
+                } catch (Exception e) {
+                    logger.error("InterruptedException", e);
+                }
+                now = System.currentTimeMillis();
+            }
+            if (now >= nextcheck) {
+                nextcheck = now + 5000;
+                qpos = 0;
+                freeDiskCheck();
+            }
+        }
+    }
+
+    private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
+        for (DelItem item : items) {
+            long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
+            logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
+                    + " to free up disk");
+            if (amount > 0) {
+                cur += amount;
+                if (cur >= stop) {
+                    cur = spoolfile.getUsableSpace();
+                }
+                if (cur >= stop) {
+                    logger.info(
+                            "NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
+                                    + TOTAL + tspace);
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private static class DelItem implements Comparable<DelItem> {
+
+        private String pubid;
+        private String spool;
+
+        public DelItem(String pubid, String spool) {
+            this.pubid = pubid;
+            this.spool = spool;
+        }
+
+        public int compareTo(DelItem other) {
+            int diff = pubid.compareTo(other.pubid);
+            if (diff == 0) {
+                diff = spool.compareTo(other.spool);
+            }
+            return (diff);
+        }
+
+        public String getPublishId() {
+            return (pubid);
+        }
+
+        public String getSpool() {
+            return (spool);
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            DelItem delItem = (DelItem) object;
+            return Objects.equals(pubid, delItem.pubid)
+                    && Objects.equals(getSpool(), delItem.getSpool());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pubid, getSpool());
+        }
+    }
+}