1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
24 package org.onap.dmaap.datarouter.node.delivery;
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
29 import java.io.IOException;
30 import java.nio.file.Files;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.Objects;
35 import org.onap.dmaap.datarouter.node.DestInfo;
36 import org.onap.dmaap.datarouter.node.NodeConfigManager;
39 * Main control point for delivering files to destinations.
41 * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
42 * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
43 * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
44 * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
45 * queue as long as that queue has more files to deliver.
47 public class Delivery {
49 private static final String TOTAL = " total=";
50 private static final String YELLOW = " yellow=";
51 private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
52 private double fdstart;
53 private double fdstop;
55 private int curthreads;
56 private NodeConfigManager config;
57 private HashMap<String, DeliveryQueue> dqs = new HashMap<>();
58 private DeliveryQueue[] queues = new DeliveryQueue[0];
60 private long nextcheck;
63 * Constructs a new Delivery system using the specified configuration manager.
65 * @param config The configuration manager for this delivery system.
67 public Delivery(NodeConfigManager config) {
69 Runnable cmon = this::checkconfig;
70 config.registerConfigTask(cmon);
74 * Reset the retry timer for a delivery queue.
76 public synchronized void resetQueue(String spool) {
78 DeliveryQueue dq = dqs.get(spool);
86 * Mark the task in spool a success.
88 public synchronized boolean markTaskSuccess(String spool, String pubId) {
89 boolean succeeded = false;
91 DeliveryQueue dq = dqs.get(spool);
93 succeeded = dq.markTaskSuccess(pubId);
99 private void cleardir(String dir) {
100 if (dqs.get(dir) != null) {
103 File fdir = new File(dir);
105 for (File junk : fdir.listFiles()) {
107 Files.delete(fdir.toPath());
110 Files.delete(fdir.toPath());
111 } catch (IOException e) {
112 logger.error("Failed to delete file: " + fdir.getPath(), e);
116 private void freeDiskCheck() {
117 File spoolfile = new File(config.getSpoolBase());
118 long tspace = spoolfile.getTotalSpace();
119 long start = (long) (tspace * fdstart);
120 long cur = spoolfile.getUsableSpace();
124 ArrayList<DelItem> cv = new ArrayList<>();
125 for (String sdir : dqs.keySet()) {
126 for (String meta : (new File(sdir)).list()) {
127 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
130 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
133 DelItem[] items = cv.toArray(new DelItem[cv.size()]);
135 long stop = (long) (tspace * fdstop);
137 "NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + TOTAL + tspace);
138 if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
141 cur = spoolfile.getUsableSpace();
143 logger.warn("NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
148 "NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + YELLOW
149 + stop + TOTAL + tspace);
152 private void cleardirs() {
153 String basedir = config.getSpoolBase();
154 String nbase = basedir + "/n";
155 for (String nodedir : (new File(nbase)).list()) {
156 if (!nodedir.startsWith(".")) {
157 cleardir(nbase + "/" + nodedir);
160 String sxbase = basedir + "/s";
161 for (String sxdir : (new File(sxbase)).list()) {
162 if (sxdir.startsWith(".")) {
165 File sxf = new File(sxbase + File.separator + sxdir);
166 for (String sdir : sxf.list()) {
167 if (!sdir.startsWith(".")) {
168 cleardir(sxbase + "/" + sxdir + "/" + sdir);
172 if (sxf.list().length == 0) {
173 Files.delete(sxf.toPath()); // won't if anything still in it
175 } catch (IOException e) {
176 logger.error("Failed to delete file: " + sxf.getPath(), e);
181 private synchronized void checkconfig() {
182 if (!config.isConfigured()) {
185 fdstart = config.getFreeDiskStart();
186 fdstop = config.getFreeDiskStop();
187 threads = config.getDeliveryThreads();
191 DestInfo[] alldis = config.getAllDests();
192 DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
194 HashMap<String, DeliveryQueue> ndqs = new HashMap<>();
195 for (DestInfo di : alldis) {
196 String spl = di.getSpool();
197 DeliveryQueue dq = dqs.get(spl);
199 dq = new DeliveryQueue(config, di);
209 while (curthreads < threads) {
211 (new Thread("del-thread-" + curthreads) {
222 private void dodelivery() {
224 while ((dq = getNextQueue()) != null) {
229 private synchronized DeliveryQueue getNextQueue() {
231 if (curthreads > threads) {
235 if (qpos < queues.length) {
236 DeliveryQueue dq = queues[qpos++];
237 if (dq.isSkipSet()) {
244 long now = System.currentTimeMillis();
245 if (now < nextcheck) {
247 wait(nextcheck + 500 - now);
248 } catch (Exception e) {
249 logger.error("InterruptedException", e);
251 now = System.currentTimeMillis();
253 if (now >= nextcheck) {
254 nextcheck = now + 5000;
261 private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
262 for (DelItem item : items) {
263 long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
264 logger.debug("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
265 + " to free up disk");
269 cur = spoolfile.getUsableSpace();
273 "NODE0503 Free disk space at or above yellow threshold. current=" + cur + YELLOW + stop
282 public static class DelItem implements Comparable<DelItem> {
284 private String pubid;
285 private String spool;
287 public DelItem(String pubid, String spool) {
292 public int compareTo(DelItem other) {
293 int diff = pubid.compareTo(other.pubid);
295 diff = spool.compareTo(other.spool);
300 public String getPublishId() {
304 public String getSpool() {
309 public boolean equals(Object object) {
310 if (this == object) {
313 if (object == null || getClass() != object.getClass()) {
316 DelItem delItem = (DelItem) object;
317 return Objects.equals(pubid, delItem.pubid)
318 && Objects.equals(getSpool(), delItem.getSpool());
322 public int hashCode() {
323 return Objects.hash(pubid, getSpool());