X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Fcom%2Fatt%2Fresearch%2Fdatarouter%2Fnode%2FDelivery.java;fp=datarouter-node%2Fsrc%2Fmain%2Fjava%2Fcom%2Fatt%2Fresearch%2Fdatarouter%2Fnode%2FDelivery.java;h=0000000000000000000000000000000000000000;hb=e4b20cc6f7c31f48ddd0de5bcd054b09a35cd510;hp=d0e88ec95eb9ff7689030aa6b0392fb25bc29926;hpb=315f5603d8e42969a13d985ca8bb50146e82325a;p=dmaap%2Fdatarouter.git diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java deleted file mode 100644 index d0e88ec9..00000000 --- a/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java +++ /dev/null @@ -1,253 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - -package com.att.research.datarouter.node; - -import java.util.*; -import java.io.*; -import org.apache.log4j.Logger; - -/** - * Main control point for delivering files to destinations. - *

- * The Delivery class manages assignment of delivery threads to delivery - * queues and creation and destruction of delivery queues as - * configuration changes. DeliveryQueues are assigned threads based on a - * modified round-robin approach giving priority to queues with more work - * as measured by both bytes to deliver and files to deliver and lower - * priority to queues that already have delivery threads working. - * A delivery thread continues to work for a delivery queue as long as - * that queue has more files to deliver. - */ -public class Delivery { - private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.Delivery"); - private static class DelItem implements Comparable { - private String pubid; - private String spool; - public int compareTo(DelItem x) { - int i = pubid.compareTo(x.pubid); - if (i == 0) { - i = spool.compareTo(x.spool); - } - return(i); - } - public String getPublishId() { - return(pubid); - } - public String getSpool() { - return(spool); - } - public DelItem(String pubid, String spool) { - this.pubid = pubid; - this.spool = spool; - } - } - private double fdstart; - private double fdstop; - private int threads; - private int curthreads; - private NodeConfigManager config; - private Hashtable dqs = new Hashtable(); - private DeliveryQueue[] queues = new DeliveryQueue[0]; - private int qpos = 0; - private long nextcheck; - private Runnable cmon = new Runnable() { - public void run() { - checkconfig(); - } - }; - /** - * Constructs a new Delivery system using the specified configuration manager. - * @param config The configuration manager for this delivery system. - */ - public Delivery(NodeConfigManager config) { - this.config = config; - config.registerConfigTask(cmon); - checkconfig(); - } - private void cleardir(String dir) { - if (dqs.get(dir) != null) { - return; - } - File fdir = new File(dir); - for (File junk: fdir.listFiles()) { - if (junk.isFile()) { - junk.delete(); - } - } - fdir.delete(); - } - private void freeDiskCheck() { - File spoolfile = new File(config.getSpoolBase()); - long tspace = spoolfile.getTotalSpace(); - long start = (long)(tspace * fdstart); - long stop = (long)(tspace * fdstop); - long cur = spoolfile.getUsableSpace(); - if (cur >= start) { - return; - } - Vector cv = new Vector(); - for (String sdir: dqs.keySet()) { - for (String meta: (new File(sdir)).list()) { - if (!meta.endsWith(".M") || meta.charAt(0) == '.') { - continue; - } - cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir)); - } - } - DelItem[] items = cv.toArray(new DelItem[cv.size()]); - Arrays.sort(items); - logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace); - for (DelItem item: items) { - long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); - logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk"); - if (amount > 0) { - cur += amount; - if (cur >= stop) { - cur = spoolfile.getUsableSpace(); - } - if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); - return; - } - } - } - cur = spoolfile.getUsableSpace(); - if (cur >= stop) { - logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); - return; - } - logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace); - } - private void cleardirs() { - String basedir = config.getSpoolBase(); - String nbase = basedir + "/n"; - for (String nodedir: (new File(nbase)).list()) { - if (!nodedir.startsWith(".")) { - cleardir(nbase + "/" + nodedir); - } - } - String sxbase = basedir + "/s"; - for (String sxdir: (new File(sxbase)).list()) { - if (sxdir.startsWith(".")) { - continue; - } - File sxf = new File(sxbase + "/" + sxdir); - for (String sdir: sxf.list()) { - if (!sdir.startsWith(".")) { - cleardir(sxbase + "/" + sxdir + "/" + sdir); - } - } - sxf.delete(); // won't if anything still in it - } - } - private synchronized void checkconfig() { - if (!config.isConfigured()) { - return; - } - fdstart = config.getFreeDiskStart(); - fdstop = config.getFreeDiskStop(); - threads = config.getDeliveryThreads(); - if (threads < 1) { - threads = 1; - } - DestInfo[] alldis = config.getAllDests(); - DeliveryQueue[] nqs = new DeliveryQueue[alldis.length]; - qpos = 0; - Hashtable ndqs = new Hashtable(); - for (DestInfo di: alldis) { - String spl = di.getSpool(); - DeliveryQueue dq = dqs.get(spl); - if (dq == null) { - dq = new DeliveryQueue(config, di); - } else { - dq.config(di); - } - ndqs.put(spl, dq); - nqs[qpos++] = dq; - } - queues = nqs; - dqs = ndqs; - cleardirs(); - while (curthreads < threads) { - curthreads++; - (new Thread() { - { - setName("Delivery Thread"); - } - public void run() { - dodelivery(); - } - }).start(); - } - nextcheck = 0; - notify(); - } - private void dodelivery() { - DeliveryQueue dq; - while ((dq = getNextQueue()) != null) { - dq.run(); - } - } - private synchronized DeliveryQueue getNextQueue() { - while (true) { - if (curthreads > threads) { - curthreads--; - return(null); - } - if (qpos < queues.length) { - DeliveryQueue dq = queues[qpos++]; - if (dq.isSkipSet()) { - continue; - } - nextcheck = 0; - notify(); - return(dq); - } - long now = System.currentTimeMillis(); - if (now < nextcheck) { - try { - wait(nextcheck + 500 - now); - } catch (Exception e) { - } - now = System.currentTimeMillis(); - } - if (now >= nextcheck) { - nextcheck = now + 5000; - qpos = 0; - freeDiskCheck(); - } - } - } - /** - * Reset the retry timer for a delivery queue - */ - public synchronized void resetQueue(String spool) { - if (spool != null) { - DeliveryQueue dq = dqs.get(spool); - if (dq != null) { - dq.resetQueue(); - } - } - } -}