4c21b34236d7329d598a34805118b08451dbc6e9
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / Delivery.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24 package org.onap.dmaap.datarouter.node;
25
26 import java.util.*;
27 import java.io.*;
28
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31
32 /**
33  * Main control point for delivering files to destinations.
34  * <p>
35  * The Delivery class manages assignment of delivery threads to delivery
36  * queues and creation and destruction of delivery queues as
37  * configuration changes.  DeliveryQueues are assigned threads based on a
38  * modified round-robin approach giving priority to queues with more work
39  * as measured by both bytes to deliver and files to deliver and lower
40  * priority to queues that already have delivery threads working.
41  * A delivery thread continues to work for a delivery queue as long as
42  * that queue has more files to deliver.
43  */
44 public class Delivery {
45     private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
46
47     private static class DelItem implements Comparable<DelItem> {
48         private String pubid;
49         private String spool;
50
51         public int compareTo(DelItem x) {
52             int i = pubid.compareTo(x.pubid);
53             if (i == 0) {
54                 i = spool.compareTo(x.spool);
55             }
56             return (i);
57         }
58
59         public String getPublishId() {
60             return (pubid);
61         }
62
63         public String getSpool() {
64             return (spool);
65         }
66
67         public DelItem(String pubid, String spool) {
68             this.pubid = pubid;
69             this.spool = spool;
70         }
71
72         @Override
73         public boolean equals(Object o) {
74             if (this == o) {
75                 return true;
76             }
77             if (o == null || getClass() != o.getClass()) {
78                 return false;
79             }
80             DelItem delItem = (DelItem) o;
81             return Objects.equals(pubid, delItem.pubid) &&
82                     Objects.equals(getSpool(), delItem.getSpool());
83         }
84
85         @Override
86         public int hashCode() {
87             return Objects.hash(pubid, getSpool());
88         }
89     }
90
91     private double fdstart;
92     private double fdstop;
93     private int threads;
94     private int curthreads;
95     private NodeConfigManager config;
96     private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
97     private DeliveryQueue[] queues = new DeliveryQueue[0];
98     private int qpos = 0;
99     private long nextcheck;
100     private Runnable cmon = new Runnable() {
101         public void run() {
102             checkconfig();
103         }
104     };
105
106     /**
107      * Constructs a new Delivery system using the specified configuration manager.
108      *
109      * @param config The configuration manager for this delivery system.
110      */
111     public Delivery(NodeConfigManager config) {
112         this.config = config;
113         config.registerConfigTask(cmon);
114         checkconfig();
115     }
116
117     private void cleardir(String dir) {
118         if (dqs.get(dir) != null) {
119             return;
120         }
121         File fdir = new File(dir);
122         for (File junk : fdir.listFiles()) {
123             if (junk.isFile()) {
124                 junk.delete();
125             }
126         }
127         fdir.delete();
128     }
129
130     private void freeDiskCheck() {
131         File spoolfile = new File(config.getSpoolBase());
132         long tspace = spoolfile.getTotalSpace();
133         long start = (long) (tspace * fdstart);
134         long stop = (long) (tspace * fdstop);
135         long cur = spoolfile.getUsableSpace();
136         if (cur >= start) {
137             return;
138         }
139         Vector<DelItem> cv = new Vector<DelItem>();
140         for (String sdir : dqs.keySet()) {
141             for (String meta : (new File(sdir)).list()) {
142                 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
143                     continue;
144                 }
145                 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
146             }
147         }
148         DelItem[] items = cv.toArray(new DelItem[cv.size()]);
149         Arrays.sort(items);
150         logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);
151         for (DelItem item : items) {
152             long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
153             logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
154             if (amount > 0) {
155                 cur += amount;
156                 if (cur >= stop) {
157                     cur = spoolfile.getUsableSpace();
158                 }
159                 if (cur >= stop) {
160                     logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
161                     return;
162                 }
163             }
164         }
165         cur = spoolfile.getUsableSpace();
166         if (cur >= stop) {
167             logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
168             return;
169         }
170         logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);
171     }
172
173     private void cleardirs() {
174         String basedir = config.getSpoolBase();
175         String nbase = basedir + "/n";
176         for (String nodedir : (new File(nbase)).list()) {
177             if (!nodedir.startsWith(".")) {
178                 cleardir(nbase + "/" + nodedir);
179             }
180         }
181         String sxbase = basedir + "/s";
182         for (String sxdir : (new File(sxbase)).list()) {
183             if (sxdir.startsWith(".")) {
184                 continue;
185             }
186             File sxf = new File(sxbase + "/" + sxdir);
187             for (String sdir : sxf.list()) {
188                 if (!sdir.startsWith(".")) {
189                     cleardir(sxbase + "/" + sxdir + "/" + sdir);
190                 }
191             }
192             sxf.delete();  // won't if anything still in it
193         }
194     }
195
196     private synchronized void checkconfig() {
197         if (!config.isConfigured()) {
198             return;
199         }
200         fdstart = config.getFreeDiskStart();
201         fdstop = config.getFreeDiskStop();
202         threads = config.getDeliveryThreads();
203         if (threads < 1) {
204             threads = 1;
205         }
206         DestInfo[] alldis = config.getAllDests();
207         DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
208         qpos = 0;
209         Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
210         for (DestInfo di : alldis) {
211             String spl = di.getSpool();
212             DeliveryQueue dq = dqs.get(spl);
213             if (dq == null) {
214                 dq = new DeliveryQueue(config, di);
215             } else {
216                 dq.config(di);
217             }
218             ndqs.put(spl, dq);
219             nqs[qpos++] = dq;
220         }
221         queues = nqs;
222         dqs = ndqs;
223         cleardirs();
224         while (curthreads < threads) {
225             curthreads++;
226             (new Thread() {
227                 {
228                     setName("Delivery Thread");
229                 }
230
231                 public void run() {
232                     dodelivery();
233                 }
234             }).start();
235         }
236         nextcheck = 0;
237         notify();
238     }
239
240     private void dodelivery() {
241         DeliveryQueue dq;
242         while ((dq = getNextQueue()) != null) {
243             dq.run();
244         }
245     }
246
247     private synchronized DeliveryQueue getNextQueue() {
248         while (true) {
249             if (curthreads > threads) {
250                 curthreads--;
251                 return (null);
252             }
253             if (qpos < queues.length) {
254                 DeliveryQueue dq = queues[qpos++];
255                 if (dq.isSkipSet()) {
256                     continue;
257                 }
258                 nextcheck = 0;
259                 notify();
260                 return (dq);
261             }
262             long now = System.currentTimeMillis();
263             if (now < nextcheck) {
264                 try {
265                     wait(nextcheck + 500 - now);
266                 } catch (Exception e) {
267                     logger.error("InterruptedException", e);
268                 }
269                 now = System.currentTimeMillis();
270             }
271             if (now >= nextcheck) {
272                 nextcheck = now + 5000;
273                 qpos = 0;
274                 freeDiskCheck();
275             }
276         }
277     }
278
279     /**
280      * Reset the retry timer for a delivery queue
281      */
282     public synchronized void resetQueue(String spool) {
283         if (spool != null) {
284             DeliveryQueue dq = dqs.get(spool);
285             if (dq != null) {
286                 dq.resetQueue();
287             }
288         }
289     }
290
291     /**
292      * Mark the task in spool a success
293      */
294     public synchronized boolean markTaskSuccess(String spool, String pubId) {
295         boolean succeeded = false;
296         if (spool != null) {
297             DeliveryQueue dq = dqs.get(spool);
298             if (dq != null) {
299                 succeeded = dq.markTaskSuccess(pubId);
300             }
301         }
302         return succeeded;
303     }
304 }