1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
24 package com.att.research.datarouter.node;
\r
28 import org.apache.log4j.Logger;
\r
31 * Main control point for delivering files to destinations.
\r
33 * The Delivery class manages assignment of delivery threads to delivery
\r
34 * queues and creation and destruction of delivery queues as
\r
35 * configuration changes. DeliveryQueues are assigned threads based on a
\r
36 * modified round-robin approach giving priority to queues with more work
\r
37 * as measured by both bytes to deliver and files to deliver and lower
\r
38 * priority to queues that already have delivery threads working.
\r
39 * A delivery thread continues to work for a delivery queue as long as
\r
40 * that queue has more files to deliver.
\r
42 public class Delivery {
\r
43 private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.Delivery");
\r
44 private static class DelItem implements Comparable<DelItem> {
\r
45 private String pubid;
\r
46 private String spool;
\r
47 public int compareTo(DelItem x) {
\r
48 int i = pubid.compareTo(x.pubid);
\r
50 i = spool.compareTo(x.spool);
\r
54 public String getPublishId() {
\r
57 public String getSpool() {
\r
60 public DelItem(String pubid, String spool) {
\r
65 private double fdstart;
\r
66 private double fdstop;
\r
67 private int threads;
\r
68 private int curthreads;
\r
69 private NodeConfigManager config;
\r
70 private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
\r
71 private DeliveryQueue[] queues = new DeliveryQueue[0];
\r
72 private int qpos = 0;
\r
73 private long nextcheck;
\r
74 private Runnable cmon = new Runnable() {
\r
80 * Constructs a new Delivery system using the specified configuration manager.
\r
81 * @param config The configuration manager for this delivery system.
\r
83 public Delivery(NodeConfigManager config) {
\r
84 this.config = config;
\r
85 config.registerConfigTask(cmon);
\r
88 private void cleardir(String dir) {
\r
89 if (dqs.get(dir) != null) {
\r
92 File fdir = new File(dir);
\r
93 for (File junk: fdir.listFiles()) {
\r
94 if (junk.isFile()) {
\r
100 private void freeDiskCheck() {
\r
101 File spoolfile = new File(config.getSpoolBase());
\r
102 long tspace = spoolfile.getTotalSpace();
\r
103 long start = (long)(tspace * fdstart);
\r
104 long stop = (long)(tspace * fdstop);
\r
105 long cur = spoolfile.getUsableSpace();
\r
106 if (cur >= start) {
\r
109 Vector<DelItem> cv = new Vector<DelItem>();
\r
110 for (String sdir: dqs.keySet()) {
\r
111 for (String meta: (new File(sdir)).list()) {
\r
112 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
\r
115 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
\r
118 DelItem[] items = cv.toArray(new DelItem[cv.size()]);
\r
119 Arrays.sort(items);
\r
120 logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace);
\r
121 for (DelItem item: items) {
\r
122 long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
\r
123 logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
\r
127 cur = spoolfile.getUsableSpace();
\r
130 logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
\r
135 cur = spoolfile.getUsableSpace();
\r
137 logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
\r
140 logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace);
\r
142 private void cleardirs() {
\r
143 String basedir = config.getSpoolBase();
\r
144 String nbase = basedir + "/n";
\r
145 for (String nodedir: (new File(nbase)).list()) {
\r
146 if (!nodedir.startsWith(".")) {
\r
147 cleardir(nbase + "/" + nodedir);
\r
150 String sxbase = basedir + "/s";
\r
151 for (String sxdir: (new File(sxbase)).list()) {
\r
152 if (sxdir.startsWith(".")) {
\r
155 File sxf = new File(sxbase + "/" + sxdir);
\r
156 for (String sdir: sxf.list()) {
\r
157 if (!sdir.startsWith(".")) {
\r
158 cleardir(sxbase + "/" + sxdir + "/" + sdir);
\r
161 sxf.delete(); // won't if anything still in it
\r
164 private synchronized void checkconfig() {
\r
165 if (!config.isConfigured()) {
\r
168 fdstart = config.getFreeDiskStart();
\r
169 fdstop = config.getFreeDiskStop();
\r
170 threads = config.getDeliveryThreads();
\r
174 DestInfo[] alldis = config.getAllDests();
\r
175 DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
\r
177 Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
\r
178 for (DestInfo di: alldis) {
\r
179 String spl = di.getSpool();
\r
180 DeliveryQueue dq = dqs.get(spl);
\r
182 dq = new DeliveryQueue(config, di);
\r
192 while (curthreads < threads) {
\r
196 setName("Delivery Thread");
\r
198 public void run() {
\r
206 private void dodelivery() {
\r
208 while ((dq = getNextQueue()) != null) {
\r
212 private synchronized DeliveryQueue getNextQueue() {
\r
214 if (curthreads > threads) {
\r
218 if (qpos < queues.length) {
\r
219 DeliveryQueue dq = queues[qpos++];
\r
220 if (dq.isSkipSet()) {
\r
227 long now = System.currentTimeMillis();
\r
228 if (now < nextcheck) {
\r
230 wait(nextcheck + 500 - now);
\r
231 } catch (Exception e) {
\r
233 now = System.currentTimeMillis();
\r
235 if (now >= nextcheck) {
\r
236 nextcheck = now + 5000;
\r
243 * Reset the retry timer for a delivery queue
\r
245 public synchronized void resetQueue(String spool) {
\r
246 if (spool != null) {
\r
247 DeliveryQueue dq = dqs.get(spool);
\r