Logging changes and unit tests
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / Delivery.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24 package org.onap.dmaap.datarouter.node;
25
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import java.io.File;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.HashMap;
32 import java.util.Objects;
33
34 /**
35  * Main control point for delivering files to destinations.
36  *
37  * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
38  * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
39  * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
40  * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
41  * queue as long as that queue has more files to deliver.
42  */
43 public class Delivery {
44
45     private static final String TOTAL = " total=";
46     private static final String YELLOW = " yellow=";
47     private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
48     private double fdstart;
49     private double fdstop;
50     private int threads;
51     private int curthreads;
52     private NodeConfigManager config;
53     private HashMap<String, DeliveryQueue> dqs = new HashMap<>();
54     private DeliveryQueue[] queues = new DeliveryQueue[0];
55     private int qpos = 0;
56     private long nextcheck;
57
58     /**
59      * Constructs a new Delivery system using the specified configuration manager.
60      *
61      * @param config The configuration manager for this delivery system.
62      */
63     public Delivery(NodeConfigManager config) {
64         this.config = config;
65         Runnable cmon = this::checkconfig;
66         config.registerConfigTask(cmon);
67         checkconfig();
68     }
69
70     /**
71      * Reset the retry timer for a delivery queue.
72      */
73     public synchronized void resetQueue(String spool) {
74         if (spool != null) {
75             DeliveryQueue dq = dqs.get(spool);
76             if (dq != null) {
77                 dq.resetQueue();
78             }
79         }
80     }
81
82     /**
83      * Mark the task in spool a success.
84      */
85     public synchronized boolean markTaskSuccess(String spool, String pubId) {
86         boolean succeeded = false;
87         if (spool != null) {
88             DeliveryQueue dq = dqs.get(spool);
89             if (dq != null) {
90                 succeeded = dq.markTaskSuccess(pubId);
91             }
92         }
93         return succeeded;
94     }
95
96     private void cleardir(String dir) {
97         if (dqs.get(dir) != null) {
98             return;
99         }
100         File fdir = new File(dir);
101         for (File junk : fdir.listFiles()) {
102             if (junk.isFile()) {
103                 junk.delete();
104             }
105         }
106         fdir.delete();
107     }
108
109     private void freeDiskCheck() {
110         File spoolfile = new File(config.getSpoolBase());
111         long tspace = spoolfile.getTotalSpace();
112         long start = (long) (tspace * fdstart);
113         long cur = spoolfile.getUsableSpace();
114         if (cur >= start) {
115             return;
116         }
117         ArrayList<DelItem> cv = new ArrayList<>();
118         for (String sdir : dqs.keySet()) {
119             for (String meta : (new File(sdir)).list()) {
120                 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
121                     continue;
122                 }
123                 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
124             }
125         }
126         DelItem[] items = cv.toArray(new DelItem[cv.size()]);
127         Arrays.sort(items);
128         long stop = (long) (tspace * fdstop);
129         logger.warn(
130                 "NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + TOTAL + tspace);
131         if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
132             return;
133         }
134         cur = spoolfile.getUsableSpace();
135         if (cur >= stop) {
136             logger.warn("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
137                                 + TOTAL + tspace);
138             return;
139         }
140         logger.warn(
141                 "NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + YELLOW
142                         + stop + TOTAL + tspace);
143     }
144
145     private void cleardirs() {
146         String basedir = config.getSpoolBase();
147         String nbase = basedir + "/n";
148         for (String nodedir : (new File(nbase)).list()) {
149             if (!nodedir.startsWith(".")) {
150                 cleardir(nbase + "/" + nodedir);
151             }
152         }
153         String sxbase = basedir + "/s";
154         for (String sxdir : (new File(sxbase)).list()) {
155             if (sxdir.startsWith(".")) {
156                 continue;
157             }
158             File sxf = new File(sxbase + "/" + sxdir);
159             for (String sdir : sxf.list()) {
160                 if (!sdir.startsWith(".")) {
161                     cleardir(sxbase + "/" + sxdir + "/" + sdir);
162                 }
163             }
164             sxf.delete();  // won't if anything still in it
165         }
166     }
167
168     private synchronized void checkconfig() {
169         if (!config.isConfigured()) {
170             return;
171         }
172         fdstart = config.getFreeDiskStart();
173         fdstop = config.getFreeDiskStop();
174         threads = config.getDeliveryThreads();
175         if (threads < 1) {
176             threads = 1;
177         }
178         DestInfo[] alldis = config.getAllDests();
179         DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
180         qpos = 0;
181         HashMap<String, DeliveryQueue> ndqs = new HashMap<>();
182         for (DestInfo di : alldis) {
183             String spl = di.getSpool();
184             DeliveryQueue dq = dqs.get(spl);
185             if (dq == null) {
186                 dq = new DeliveryQueue(config, di);
187             } else {
188                 dq.config(di);
189             }
190             ndqs.put(spl, dq);
191             nqs[qpos++] = dq;
192         }
193         queues = nqs;
194         dqs = ndqs;
195         cleardirs();
196         while (curthreads < threads) {
197             curthreads++;
198             (new Thread("del-thread-" + curthreads) {
199                 @Override
200                 public void run() {
201                     dodelivery();
202                 }
203             }).start();
204         }
205         nextcheck = 0;
206         notify();
207     }
208
209     private void dodelivery() {
210         DeliveryQueue dq;
211         while ((dq = getNextQueue()) != null) {
212             dq.run();
213         }
214     }
215
216     private synchronized DeliveryQueue getNextQueue() {
217         while (true) {
218             if (curthreads > threads) {
219                 curthreads--;
220                 return (null);
221             }
222             if (qpos < queues.length) {
223                 DeliveryQueue dq = queues[qpos++];
224                 if (dq.isSkipSet()) {
225                     continue;
226                 }
227                 nextcheck = 0;
228                 notify();
229                 return (dq);
230             }
231             long now = System.currentTimeMillis();
232             if (now < nextcheck) {
233                 try {
234                     wait(nextcheck + 500 - now);
235                 } catch (Exception e) {
236                     logger.error("InterruptedException", e);
237                 }
238                 now = System.currentTimeMillis();
239             }
240             if (now >= nextcheck) {
241                 nextcheck = now + 5000;
242                 qpos = 0;
243                 freeDiskCheck();
244             }
245         }
246     }
247
248     private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
249         for (DelItem item : items) {
250             long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
251             logger.debug("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
252                                 + " to free up disk");
253             if (amount > 0) {
254                 cur += amount;
255                 if (cur >= stop) {
256                     cur = spoolfile.getUsableSpace();
257                 }
258                 if (cur >= stop) {
259                     logger.warn(
260                             "NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
261                                     + TOTAL + tspace);
262                     return true;
263                 }
264             }
265         }
266         return false;
267     }
268
269     static class DelItem implements Comparable<DelItem> {
270
271         private String pubid;
272         private String spool;
273
274         public DelItem(String pubid, String spool) {
275             this.pubid = pubid;
276             this.spool = spool;
277         }
278
279         public int compareTo(DelItem other) {
280             int diff = pubid.compareTo(other.pubid);
281             if (diff == 0) {
282                 diff = spool.compareTo(other.spool);
283             }
284             return (diff);
285         }
286
287         public String getPublishId() {
288             return (pubid);
289         }
290
291         public String getSpool() {
292             return (spool);
293         }
294
295         @Override
296         public boolean equals(Object object) {
297             if (this == object) {
298                 return true;
299             }
300             if (object == null || getClass() != object.getClass()) {
301                 return false;
302             }
303             DelItem delItem = (DelItem) object;
304             return Objects.equals(pubid, delItem.pubid)
305                            && Objects.equals(getSpool(), delItem.getSpool());
306         }
307
308         @Override
309         public int hashCode() {
310             return Objects.hash(pubid, getSpool());
311         }
312     }
313 }