1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
24 package org.onap.dmaap.datarouter.node;
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
33 * Main control point for delivering files to destinations.
35 * The Delivery class manages assignment of delivery threads to delivery
36 * queues and creation and destruction of delivery queues as
37 * configuration changes. DeliveryQueues are assigned threads based on a
38 * modified round-robin approach giving priority to queues with more work
39 * as measured by both bytes to deliver and files to deliver and lower
40 * priority to queues that already have delivery threads working.
41 * A delivery thread continues to work for a delivery queue as long as
42 * that queue has more files to deliver.
44 public class Delivery {
45 private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
47 private static class DelItem implements Comparable<DelItem> {
51 public int compareTo(DelItem x) {
52 int i = pubid.compareTo(x.pubid);
54 i = spool.compareTo(x.spool);
59 public String getPublishId() {
63 public String getSpool() {
67 public DelItem(String pubid, String spool) {
73 public boolean equals(Object o) {
77 if (o == null || getClass() != o.getClass()) {
80 DelItem delItem = (DelItem) o;
81 return Objects.equals(pubid, delItem.pubid) &&
82 Objects.equals(getSpool(), delItem.getSpool());
86 public int hashCode() {
87 return Objects.hash(pubid, getSpool());
91 private double fdstart;
92 private double fdstop;
94 private int curthreads;
95 private NodeConfigManager config;
96 private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
97 private DeliveryQueue[] queues = new DeliveryQueue[0];
99 private long nextcheck;
100 private Runnable cmon = new Runnable() {
107 * Constructs a new Delivery system using the specified configuration manager.
109 * @param config The configuration manager for this delivery system.
111 public Delivery(NodeConfigManager config) {
112 this.config = config;
113 config.registerConfigTask(cmon);
117 private void cleardir(String dir) {
118 if (dqs.get(dir) != null) {
121 File fdir = new File(dir);
122 for (File junk : fdir.listFiles()) {
130 private void freeDiskCheck() {
131 File spoolfile = new File(config.getSpoolBase());
132 long tspace = spoolfile.getTotalSpace();
133 long start = (long) (tspace * fdstart);
134 long stop = (long) (tspace * fdstop);
135 long cur = spoolfile.getUsableSpace();
139 Vector<DelItem> cv = new Vector<DelItem>();
140 for (String sdir : dqs.keySet()) {
141 for (String meta : (new File(sdir)).list()) {
142 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
145 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
148 DelItem[] items = cv.toArray(new DelItem[cv.size()]);
150 logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace);
151 for (DelItem item : items) {
152 long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
153 logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
157 cur = spoolfile.getUsableSpace();
160 logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
165 cur = spoolfile.getUsableSpace();
167 logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace);
170 logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace);
173 private void cleardirs() {
174 String basedir = config.getSpoolBase();
175 String nbase = basedir + "/n";
176 for (String nodedir : (new File(nbase)).list()) {
177 if (!nodedir.startsWith(".")) {
178 cleardir(nbase + "/" + nodedir);
181 String sxbase = basedir + "/s";
182 for (String sxdir : (new File(sxbase)).list()) {
183 if (sxdir.startsWith(".")) {
186 File sxf = new File(sxbase + "/" + sxdir);
187 for (String sdir : sxf.list()) {
188 if (!sdir.startsWith(".")) {
189 cleardir(sxbase + "/" + sxdir + "/" + sdir);
192 sxf.delete(); // won't if anything still in it
196 private synchronized void checkconfig() {
197 if (!config.isConfigured()) {
200 fdstart = config.getFreeDiskStart();
201 fdstop = config.getFreeDiskStop();
202 threads = config.getDeliveryThreads();
206 DestInfo[] alldis = config.getAllDests();
207 DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
209 Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
210 for (DestInfo di : alldis) {
211 String spl = di.getSpool();
212 DeliveryQueue dq = dqs.get(spl);
214 dq = new DeliveryQueue(config, di);
224 while (curthreads < threads) {
228 setName("Delivery Thread");
240 private void dodelivery() {
242 while ((dq = getNextQueue()) != null) {
247 private synchronized DeliveryQueue getNextQueue() {
249 if (curthreads > threads) {
253 if (qpos < queues.length) {
254 DeliveryQueue dq = queues[qpos++];
255 if (dq.isSkipSet()) {
262 long now = System.currentTimeMillis();
263 if (now < nextcheck) {
265 wait(nextcheck + 500 - now);
266 } catch (Exception e) {
267 logger.error("InterruptedException", e);
269 now = System.currentTimeMillis();
271 if (now >= nextcheck) {
272 nextcheck = now + 5000;
280 * Reset the retry timer for a delivery queue
282 public synchronized void resetQueue(String spool) {
284 DeliveryQueue dq = dqs.get(spool);
292 * Mark the task in spool a success
294 public synchronized boolean markTaskSuccess(String spool, String pubId) {
295 boolean succeeded = false;
297 DeliveryQueue dq = dqs.get(spool);
299 succeeded = dq.markTaskSuccess(pubId);