83d5186a63050b17e28b32c4193174488822c95b
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / Delivery.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24 package org.onap.dmaap.datarouter.node;
25
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import java.io.File;
29 import java.io.IOException;
30 import java.nio.file.Files;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.Objects;
35
36 /**
37  * Main control point for delivering files to destinations.
38  *
39  * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
40  * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
41  * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
42  * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
43  * queue as long as that queue has more files to deliver.
44  */
45 public class Delivery {
46
47     private static final String TOTAL = " total=";
48     private static final String YELLOW = " yellow=";
49     private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
50     private double fdstart;
51     private double fdstop;
52     private int threads;
53     private int curthreads;
54     private NodeConfigManager config;
55     private HashMap<String, DeliveryQueue> dqs = new HashMap<>();
56     private DeliveryQueue[] queues = new DeliveryQueue[0];
57     private int qpos = 0;
58     private long nextcheck;
59
60     /**
61      * Constructs a new Delivery system using the specified configuration manager.
62      *
63      * @param config The configuration manager for this delivery system.
64      */
65     public Delivery(NodeConfigManager config) {
66         this.config = config;
67         Runnable cmon = this::checkconfig;
68         config.registerConfigTask(cmon);
69         checkconfig();
70     }
71
72     /**
73      * Reset the retry timer for a delivery queue.
74      */
75     public synchronized void resetQueue(String spool) {
76         if (spool != null) {
77             DeliveryQueue dq = dqs.get(spool);
78             if (dq != null) {
79                 dq.resetQueue();
80             }
81         }
82     }
83
84     /**
85      * Mark the task in spool a success.
86      */
87     public synchronized boolean markTaskSuccess(String spool, String pubId) {
88         boolean succeeded = false;
89         if (spool != null) {
90             DeliveryQueue dq = dqs.get(spool);
91             if (dq != null) {
92                 succeeded = dq.markTaskSuccess(pubId);
93             }
94         }
95         return succeeded;
96     }
97
98     private void cleardir(String dir) {
99         if (dqs.get(dir) != null) {
100             return;
101         }
102         File fdir = new File(dir);
103         try {
104             for (File junk : fdir.listFiles()) {
105                 if (junk.isFile()) {
106                     Files.delete(fdir.toPath());
107                 }
108             }
109             Files.delete(fdir.toPath());
110         } catch (IOException e) {
111             logger.error("Failed to delete file: " + fdir.getPath(), e);
112         }
113     }
114
115     private void freeDiskCheck() {
116         File spoolfile = new File(config.getSpoolBase());
117         long tspace = spoolfile.getTotalSpace();
118         long start = (long) (tspace * fdstart);
119         long cur = spoolfile.getUsableSpace();
120         if (cur >= start) {
121             return;
122         }
123         ArrayList<DelItem> cv = new ArrayList<>();
124         for (String sdir : dqs.keySet()) {
125             for (String meta : (new File(sdir)).list()) {
126                 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
127                     continue;
128                 }
129                 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
130             }
131         }
132         DelItem[] items = cv.toArray(new DelItem[cv.size()]);
133         Arrays.sort(items);
134         long stop = (long) (tspace * fdstop);
135         logger.warn(
136             "NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + TOTAL + tspace);
137         if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
138             return;
139         }
140         cur = spoolfile.getUsableSpace();
141         if (cur >= stop) {
142             logger.warn("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
143                 + TOTAL + tspace);
144             return;
145         }
146         logger.warn(
147             "NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + YELLOW
148                 + stop + TOTAL + tspace);
149     }
150
151     private void cleardirs() {
152         String basedir = config.getSpoolBase();
153         String nbase = basedir + "/n";
154         for (String nodedir : (new File(nbase)).list()) {
155             if (!nodedir.startsWith(".")) {
156                 cleardir(nbase + "/" + nodedir);
157             }
158         }
159         String sxbase = basedir + "/s";
160         for (String sxdir : (new File(sxbase)).list()) {
161             if (sxdir.startsWith(".")) {
162                 continue;
163             }
164             File sxf = new File(sxbase + "/" + sxdir);
165             for (String sdir : sxf.list()) {
166                 if (!sdir.startsWith(".")) {
167                     cleardir(sxbase + "/" + sxdir + "/" + sdir);
168                 }
169             }
170             try {
171                 if (sxf.list().length == 0) {
172                     Files.delete(sxf.toPath());  // won't if anything still in it
173                 }
174             } catch (IOException e) {
175                 logger.error("Failed to delete file: " + sxf.getPath(), e);
176             }
177         }
178     }
179
180     private synchronized void checkconfig() {
181         if (!config.isConfigured()) {
182             return;
183         }
184         fdstart = config.getFreeDiskStart();
185         fdstop = config.getFreeDiskStop();
186         threads = config.getDeliveryThreads();
187         if (threads < 1) {
188             threads = 1;
189         }
190         DestInfo[] alldis = config.getAllDests();
191         DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
192         qpos = 0;
193         HashMap<String, DeliveryQueue> ndqs = new HashMap<>();
194         for (DestInfo di : alldis) {
195             String spl = di.getSpool();
196             DeliveryQueue dq = dqs.get(spl);
197             if (dq == null) {
198                 dq = new DeliveryQueue(config, di);
199             } else {
200                 dq.config(di);
201             }
202             ndqs.put(spl, dq);
203             nqs[qpos++] = dq;
204         }
205         queues = nqs;
206         dqs = ndqs;
207         cleardirs();
208         while (curthreads < threads) {
209             curthreads++;
210             (new Thread("del-thread-" + curthreads) {
211                 @Override
212                 public void run() {
213                     dodelivery();
214                 }
215             }).start();
216         }
217         nextcheck = 0;
218         notifyAll();
219     }
220
221     private void dodelivery() {
222         DeliveryQueue dq;
223         while ((dq = getNextQueue()) != null) {
224             dq.run();
225         }
226     }
227
228     private synchronized DeliveryQueue getNextQueue() {
229         while (true) {
230             if (curthreads > threads) {
231                 curthreads--;
232                 return (null);
233             }
234             if (qpos < queues.length) {
235                 DeliveryQueue dq = queues[qpos++];
236                 if (dq.isSkipSet()) {
237                     continue;
238                 }
239                 nextcheck = 0;
240                 notifyAll();
241                 return (dq);
242             }
243             long now = System.currentTimeMillis();
244             if (now < nextcheck) {
245                 try {
246                     wait(nextcheck + 500 - now);
247                 } catch (Exception e) {
248                     logger.error("InterruptedException", e);
249                 }
250                 now = System.currentTimeMillis();
251             }
252             if (now >= nextcheck) {
253                 nextcheck = now + 5000;
254                 qpos = 0;
255                 freeDiskCheck();
256             }
257         }
258     }
259
260     private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
261         for (DelItem item : items) {
262             long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
263             logger.debug("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
264                 + " to free up disk");
265             if (amount > 0) {
266                 cur += amount;
267                 if (cur >= stop) {
268                     cur = spoolfile.getUsableSpace();
269                 }
270                 if (cur >= stop) {
271                     logger.warn(
272                         "NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
273                             + TOTAL + tspace);
274                     return true;
275                 }
276             }
277         }
278         return false;
279     }
280
281     static class DelItem implements Comparable<DelItem> {
282
283         private String pubid;
284         private String spool;
285
286         public DelItem(String pubid, String spool) {
287             this.pubid = pubid;
288             this.spool = spool;
289         }
290
291         public int compareTo(DelItem other) {
292             int diff = pubid.compareTo(other.pubid);
293             if (diff == 0) {
294                 diff = spool.compareTo(other.spool);
295             }
296             return (diff);
297         }
298
299         public String getPublishId() {
300             return (pubid);
301         }
302
303         public String getSpool() {
304             return (spool);
305         }
306
307         @Override
308         public boolean equals(Object object) {
309             if (this == object) {
310                 return true;
311             }
312             if (object == null || getClass() != object.getClass()) {
313                 return false;
314             }
315             DelItem delItem = (DelItem) object;
316             return Objects.equals(pubid, delItem.pubid)
317                 && Objects.equals(getSpool(), delItem.getSpool());
318         }
319
320         @Override
321         public int hashCode() {
322             return Objects.hash(pubid, getSpool());
323         }
324     }
325 }