501e489c43440e61fe0d2fb4e3059ebe589ef1fd
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / Delivery.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24 package org.onap.dmaap.datarouter.node;
25
26 import java.util.*;
27 import java.io.*;
28
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31
32 /**
33  * Main control point for delivering files to destinations.
34  * <p>
35  * The Delivery class manages assignment of delivery threads to delivery
36  * queues and creation and destruction of delivery queues as
37  * configuration changes.  DeliveryQueues are assigned threads based on a
38  * modified round-robin approach giving priority to queues with more work
39  * as measured by both bytes to deliver and files to deliver and lower
40  * priority to queues that already have delivery threads working.
41  * A delivery thread continues to work for a delivery queue as long as
42  * that queue has more files to deliver.
43  */
44 public class Delivery {
45     private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
46
47     private static class DelItem implements Comparable<DelItem> {
48         private String pubid;
49         private String spool;
50
51         public int compareTo(DelItem x) {
52             int i = pubid.compareTo(x.pubid);
53             if (i == 0) {
54                 i = spool.compareTo(x.spool);
55             }
56             return (i);
57         }
58
59         public String getPublishId() {
60             return (pubid);
61         }
62
63         public String getSpool() {
64             return (spool);
65         }
66
67         public DelItem(String pubid, String spool) {
68             this.pubid = pubid;
69             this.spool = spool;
70         }
71     }
72
73     private double fdstart;
74     private double fdstop;
75     private int threads;
76     private int curthreads;
77     private NodeConfigManager config;
78     private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
79     private DeliveryQueue[] queues = new DeliveryQueue[0];
80     private int qpos = 0;
81     private long nextcheck;
82     private Runnable cmon = new Runnable() {
83         public void run() {
84             checkconfig();
85         }
86     };
87
88     /**
89      * Constructs a new Delivery system using the specified configuration manager.
90      *
91      * @param config The configuration manager for this delivery system.
92      */
93     public Delivery(NodeConfigManager config) {
94         this.config = config;
95         config.registerConfigTask(cmon);
96         checkconfig();
97     }
98
99     private void cleardir(String dir) {
100         if (dqs.get(dir) != null) {
101             return;
102         }
103         File fdir = new File(dir);
104         for (File junk : fdir.listFiles()) {
105             if (junk.isFile()) {
106                 junk.delete();
107             }
108         }
109         fdir.delete();
110     }
111
112     private void freeDiskCheck() {
113         File spoolfile = new File(config.getSpoolBase());
114         long tspace = spoolfile.getTotalSpace();
115         long start = (long) (tspace * fdstart);
116         long stop = (long) (tspace * fdstop);
117         long cur = spoolfile.getUsableSpace();
118         if (cur >= start) {
119             return;
120         }
121         Vector<DelItem> cv = new Vector<DelItem>();
122         for (String sdir : dqs.keySet()) {
123             for (String meta : (new File(sdir)).list()) {
124                 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
125                     continue;
126                 }
127                 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
128             }
129         }
130         DelItem[] items = cv.toArray(new DelItem[cv.size()]);
131         Arrays.sort(items);
132         logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);
133         for (DelItem item : items) {
134             long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
135             logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
136             if (amount > 0) {
137                 cur += amount;
138                 if (cur >= stop) {
139                     cur = spoolfile.getUsableSpace();
140                 }
141                 if (cur >= stop) {
142                     logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
143                     return;
144                 }
145             }
146         }
147         cur = spoolfile.getUsableSpace();
148         if (cur >= stop) {
149             logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
150             return;
151         }
152         logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);
153     }
154
155     private void cleardirs() {
156         String basedir = config.getSpoolBase();
157         String nbase = basedir + "/n";
158         for (String nodedir : (new File(nbase)).list()) {
159             if (!nodedir.startsWith(".")) {
160                 cleardir(nbase + "/" + nodedir);
161             }
162         }
163         String sxbase = basedir + "/s";
164         for (String sxdir : (new File(sxbase)).list()) {
165             if (sxdir.startsWith(".")) {
166                 continue;
167             }
168             File sxf = new File(sxbase + "/" + sxdir);
169             for (String sdir : sxf.list()) {
170                 if (!sdir.startsWith(".")) {
171                     cleardir(sxbase + "/" + sxdir + "/" + sdir);
172                 }
173             }
174             sxf.delete();  // won't if anything still in it
175         }
176     }
177
178     private synchronized void checkconfig() {
179         if (!config.isConfigured()) {
180             return;
181         }
182         fdstart = config.getFreeDiskStart();
183         fdstop = config.getFreeDiskStop();
184         threads = config.getDeliveryThreads();
185         if (threads < 1) {
186             threads = 1;
187         }
188         DestInfo[] alldis = config.getAllDests();
189         DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
190         qpos = 0;
191         Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
192         for (DestInfo di : alldis) {
193             String spl = di.getSpool();
194             DeliveryQueue dq = dqs.get(spl);
195             if (dq == null) {
196                 dq = new DeliveryQueue(config, di);
197             } else {
198                 dq.config(di);
199             }
200             ndqs.put(spl, dq);
201             nqs[qpos++] = dq;
202         }
203         queues = nqs;
204         dqs = ndqs;
205         cleardirs();
206         while (curthreads < threads) {
207             curthreads++;
208             (new Thread() {
209                 {
210                     setName("Delivery Thread");
211                 }
212
213                 public void run() {
214                     dodelivery();
215                 }
216             }).start();
217         }
218         nextcheck = 0;
219         notify();
220     }
221
222     private void dodelivery() {
223         DeliveryQueue dq;
224         while ((dq = getNextQueue()) != null) {
225             dq.run();
226         }
227     }
228
229     private synchronized DeliveryQueue getNextQueue() {
230         while (true) {
231             if (curthreads > threads) {
232                 curthreads--;
233                 return (null);
234             }
235             if (qpos < queues.length) {
236                 DeliveryQueue dq = queues[qpos++];
237                 if (dq.isSkipSet()) {
238                     continue;
239                 }
240                 nextcheck = 0;
241                 notify();
242                 return (dq);
243             }
244             long now = System.currentTimeMillis();
245             if (now < nextcheck) {
246                 try {
247                     wait(nextcheck + 500 - now);
248                 } catch (Exception e) {
249                 }
250                 now = System.currentTimeMillis();
251             }
252             if (now >= nextcheck) {
253                 nextcheck = now + 5000;
254                 qpos = 0;
255                 freeDiskCheck();
256             }
257         }
258     }
259
260     /**
261      * Reset the retry timer for a delivery queue
262      */
263     public synchronized void resetQueue(String spool) {
264         if (spool != null) {
265             DeliveryQueue dq = dqs.get(spool);
266             if (dq != null) {
267                 dq.resetQueue();
268             }
269         }
270     }
271
272     /**
273      * Mark the task in spool a success
274      */
275     public synchronized boolean markTaskSuccess(String spool, String pubId) {
276         boolean succeeded = false;
277         if (spool != null) {
278             DeliveryQueue dq = dqs.get(spool);
279             if (dq != null) {
280                 succeeded = dq.markTaskSuccess(pubId);
281             }
282         }
283         return succeeded;
284     }
285 }