Add optional API for PM Mapper
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / Delivery.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24 package org.onap.dmaap.datarouter.node;
25
26 import java.util.*;
27 import java.io.*;
28
29 import org.apache.log4j.Logger;
30
31 /**
32  * Main control point for delivering files to destinations.
33  * <p>
34  * The Delivery class manages assignment of delivery threads to delivery
35  * queues and creation and destruction of delivery queues as
36  * configuration changes.  DeliveryQueues are assigned threads based on a
37  * modified round-robin approach giving priority to queues with more work
38  * as measured by both bytes to deliver and files to deliver and lower
39  * priority to queues that already have delivery threads working.
40  * A delivery thread continues to work for a delivery queue as long as
41  * that queue has more files to deliver.
42  */
43 public class Delivery {
44     private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.Delivery");
45
46     private static class DelItem implements Comparable<DelItem> {
47         private String pubid;
48         private String spool;
49
50         public int compareTo(DelItem x) {
51             int i = pubid.compareTo(x.pubid);
52             if (i == 0) {
53                 i = spool.compareTo(x.spool);
54             }
55             return (i);
56         }
57
58         public String getPublishId() {
59             return (pubid);
60         }
61
62         public String getSpool() {
63             return (spool);
64         }
65
66         public DelItem(String pubid, String spool) {
67             this.pubid = pubid;
68             this.spool = spool;
69         }
70     }
71
72     private double fdstart;
73     private double fdstop;
74     private int threads;
75     private int curthreads;
76     private NodeConfigManager config;
77     private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
78     private DeliveryQueue[] queues = new DeliveryQueue[0];
79     private int qpos = 0;
80     private long nextcheck;
81     private Runnable cmon = new Runnable() {
82         public void run() {
83             checkconfig();
84         }
85     };
86
87     /**
88      * Constructs a new Delivery system using the specified configuration manager.
89      *
90      * @param config The configuration manager for this delivery system.
91      */
92     public Delivery(NodeConfigManager config) {
93         this.config = config;
94         config.registerConfigTask(cmon);
95         checkconfig();
96     }
97
98     private void cleardir(String dir) {
99         if (dqs.get(dir) != null) {
100             return;
101         }
102         File fdir = new File(dir);
103         for (File junk : fdir.listFiles()) {
104             if (junk.isFile()) {
105                 junk.delete();
106             }
107         }
108         fdir.delete();
109     }
110
111     private void freeDiskCheck() {
112         File spoolfile = new File(config.getSpoolBase());
113         long tspace = spoolfile.getTotalSpace();
114         long start = (long) (tspace * fdstart);
115         long stop = (long) (tspace * fdstop);
116         long cur = spoolfile.getUsableSpace();
117         if (cur >= start) {
118             return;
119         }
120         Vector<DelItem> cv = new Vector<DelItem>();
121         for (String sdir : dqs.keySet()) {
122             for (String meta : (new File(sdir)).list()) {
123                 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
124                     continue;
125                 }
126                 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));
127             }
128         }
129         DelItem[] items = cv.toArray(new DelItem[cv.size()]);
130         Arrays.sort(items);
131         logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);
132         for (DelItem item : items) {
133             long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
134             logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
135             if (amount > 0) {
136                 cur += amount;
137                 if (cur >= stop) {
138                     cur = spoolfile.getUsableSpace();
139                 }
140                 if (cur >= stop) {
141                     logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
142                     return;
143                 }
144             }
145         }
146         cur = spoolfile.getUsableSpace();
147         if (cur >= stop) {
148             logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
149             return;
150         }
151         logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);
152     }
153
154     private void cleardirs() {
155         String basedir = config.getSpoolBase();
156         String nbase = basedir + "/n";
157         for (String nodedir : (new File(nbase)).list()) {
158             if (!nodedir.startsWith(".")) {
159                 cleardir(nbase + "/" + nodedir);
160             }
161         }
162         String sxbase = basedir + "/s";
163         for (String sxdir : (new File(sxbase)).list()) {
164             if (sxdir.startsWith(".")) {
165                 continue;
166             }
167             File sxf = new File(sxbase + "/" + sxdir);
168             for (String sdir : sxf.list()) {
169                 if (!sdir.startsWith(".")) {
170                     cleardir(sxbase + "/" + sxdir + "/" + sdir);
171                 }
172             }
173             sxf.delete();  // won't if anything still in it
174         }
175     }
176
177     private synchronized void checkconfig() {
178         if (!config.isConfigured()) {
179             return;
180         }
181         fdstart = config.getFreeDiskStart();
182         fdstop = config.getFreeDiskStop();
183         threads = config.getDeliveryThreads();
184         if (threads < 1) {
185             threads = 1;
186         }
187         DestInfo[] alldis = config.getAllDests();
188         DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
189         qpos = 0;
190         Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
191         for (DestInfo di : alldis) {
192             String spl = di.getSpool();
193             DeliveryQueue dq = dqs.get(spl);
194             if (dq == null) {
195                 dq = new DeliveryQueue(config, di);
196             } else {
197                 dq.config(di);
198             }
199             ndqs.put(spl, dq);
200             nqs[qpos++] = dq;
201         }
202         queues = nqs;
203         dqs = ndqs;
204         cleardirs();
205         while (curthreads < threads) {
206             curthreads++;
207             (new Thread() {
208                 {
209                     setName("Delivery Thread");
210                 }
211
212                 public void run() {
213                     dodelivery();
214                 }
215             }).start();
216         }
217         nextcheck = 0;
218         notify();
219     }
220
221     private void dodelivery() {
222         DeliveryQueue dq;
223         while ((dq = getNextQueue()) != null) {
224             dq.run();
225         }
226     }
227
228     private synchronized DeliveryQueue getNextQueue() {
229         while (true) {
230             if (curthreads > threads) {
231                 curthreads--;
232                 return (null);
233             }
234             if (qpos < queues.length) {
235                 DeliveryQueue dq = queues[qpos++];
236                 if (dq.isSkipSet()) {
237                     continue;
238                 }
239                 nextcheck = 0;
240                 notify();
241                 return (dq);
242             }
243             long now = System.currentTimeMillis();
244             if (now < nextcheck) {
245                 try {
246                     wait(nextcheck + 500 - now);
247                 } catch (Exception e) {
248                 }
249                 now = System.currentTimeMillis();
250             }
251             if (now >= nextcheck) {
252                 nextcheck = now + 5000;
253                 qpos = 0;
254                 freeDiskCheck();
255             }
256         }
257     }
258
259     /**
260      * Reset the retry timer for a delivery queue
261      */
262     public synchronized void resetQueue(String spool) {
263         if (spool != null) {
264             DeliveryQueue dq = dqs.get(spool);
265             if (dq != null) {
266                 dq.resetQueue();
267             }
268         }
269     }
270
271     /**
272      * Mark the task in spool a success
273      */
274     public synchronized boolean markTaskSuccess(String spool, String pubId) {
275         boolean succeeded = false;
276         if (spool != null) {
277             DeliveryQueue dq = dqs.get(spool);
278             if (dq != null) {
279                 succeeded = dq.markTaskSuccess(pubId);
280             }
281         }
282         return succeeded;
283     }
284 }