Update project structure to org.onap
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / Delivery.java
1 /*******************************************************************************\r
2  * ============LICENSE_START==================================================\r
3  * * org.onap.dmaap\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 \r
24 package org.onap.dmaap.datarouter.node;\r
25 \r
26 import java.util.*;\r
27 import java.io.*;\r
28 import org.apache.log4j.Logger;\r
29 \r
30 /**\r
31  *      Main control point for delivering files to destinations.\r
32  *      <p>\r
33  *      The Delivery class manages assignment of delivery threads to delivery\r
34  *      queues and creation and destruction of delivery queues as\r
35  *      configuration changes.  DeliveryQueues are assigned threads based on a\r
36  *      modified round-robin approach giving priority to queues with more work\r
37  *      as measured by both bytes to deliver and files to deliver and lower\r
38  *      priority to queues that already have delivery threads working.\r
39  *      A delivery thread continues to work for a delivery queue as long as\r
40  *      that queue has more files to deliver.\r
41  */\r
42 public class Delivery {\r
43         private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.Delivery");\r
44         private static class DelItem implements Comparable<DelItem>     {\r
45                 private String pubid;\r
46                 private String spool;\r
47                 public int compareTo(DelItem x) {\r
48                         int i = pubid.compareTo(x.pubid);\r
49                         if (i == 0) {\r
50                                 i = spool.compareTo(x.spool);\r
51                         }\r
52                         return(i);\r
53                 }\r
54                 public String getPublishId() {\r
55                         return(pubid);\r
56                 }\r
57                 public String getSpool() {\r
58                         return(spool);\r
59                 }\r
60                 public DelItem(String pubid, String spool) {\r
61                         this.pubid = pubid;\r
62                         this.spool = spool;\r
63                 }\r
64         }\r
65         private double  fdstart;\r
66         private double  fdstop;\r
67         private int     threads;\r
68         private int     curthreads;\r
69         private NodeConfigManager       config;\r
70         private Hashtable<String, DeliveryQueue>        dqs = new Hashtable<String, DeliveryQueue>();\r
71         private DeliveryQueue[] queues = new DeliveryQueue[0];\r
72         private int     qpos = 0;\r
73         private long    nextcheck;\r
74         private Runnable        cmon = new Runnable() {\r
75                 public void run() {\r
76                         checkconfig();\r
77                 }\r
78         };\r
79         /**\r
80          *      Constructs a new Delivery system using the specified configuration manager.\r
81          *      @param config   The configuration manager for this delivery system.\r
82          */\r
83         public Delivery(NodeConfigManager config) {\r
84                 this.config = config;\r
85                 config.registerConfigTask(cmon);\r
86                 checkconfig();\r
87         }\r
88         private void cleardir(String dir) {\r
89                 if (dqs.get(dir) != null) {\r
90                         return;\r
91                 }\r
92                 File fdir = new File(dir);\r
93                 for (File junk: fdir.listFiles()) {\r
94                         if (junk.isFile()) {\r
95                                 junk.delete();\r
96                         }\r
97                 }\r
98                 fdir.delete();\r
99         }\r
100         private void freeDiskCheck() {\r
101                 File spoolfile = new File(config.getSpoolBase());\r
102                 long tspace = spoolfile.getTotalSpace();\r
103                 long start = (long)(tspace * fdstart);\r
104                 long stop = (long)(tspace * fdstop);\r
105                 long cur = spoolfile.getUsableSpace();\r
106                 if (cur >= start) {\r
107                         return;\r
108                 }\r
109                 Vector<DelItem> cv = new Vector<DelItem>();\r
110                 for (String sdir: dqs.keySet()) {\r
111                         for (String meta: (new File(sdir)).list()) {\r
112                                 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {\r
113                                         continue;\r
114                                 }\r
115                                 cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));\r
116                         }\r
117                 }\r
118                 DelItem[] items = cv.toArray(new DelItem[cv.size()]);\r
119                 Arrays.sort(items);\r
120                 logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);\r
121                 for (DelItem item: items) {\r
122                         long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());\r
123                         logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");\r
124                         if (amount > 0) {\r
125                                 cur += amount;\r
126                                 if (cur >= stop) {\r
127                                         cur = spoolfile.getUsableSpace();\r
128                                 }\r
129                                 if (cur >= stop) {\r
130                                         logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
131                                         return;\r
132                                 }\r
133                         }\r
134                 }\r
135                 cur = spoolfile.getUsableSpace();\r
136                 if (cur >= stop) {\r
137                         logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
138                         return;\r
139                 }\r
140                 logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
141         }\r
142         private void cleardirs() {\r
143                 String basedir = config.getSpoolBase();\r
144                 String nbase = basedir + "/n";\r
145                 for (String nodedir: (new File(nbase)).list()) {\r
146                         if (!nodedir.startsWith(".")) {\r
147                                 cleardir(nbase + "/" + nodedir);\r
148                         }\r
149                 }\r
150                 String sxbase = basedir + "/s";\r
151                 for (String sxdir: (new File(sxbase)).list()) {\r
152                         if (sxdir.startsWith(".")) {\r
153                                 continue;\r
154                         }\r
155                         File sxf = new File(sxbase + "/" + sxdir);\r
156                         for (String sdir: sxf.list()) {\r
157                                 if (!sdir.startsWith(".")) {\r
158                                         cleardir(sxbase + "/" + sxdir + "/" + sdir);\r
159                                 }\r
160                         }\r
161                         sxf.delete();  // won't if anything still in it\r
162                 }\r
163         }\r
164         private synchronized void checkconfig() {\r
165                 if (!config.isConfigured()) {\r
166                         return;\r
167                 }\r
168                 fdstart = config.getFreeDiskStart();\r
169                 fdstop = config.getFreeDiskStop();\r
170                 threads = config.getDeliveryThreads();\r
171                 if (threads < 1) {\r
172                         threads = 1;\r
173                 }\r
174                 DestInfo[] alldis = config.getAllDests();\r
175                 DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];\r
176                 qpos = 0;\r
177                 Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();\r
178                 for (DestInfo di: alldis) {\r
179                         String spl = di.getSpool();\r
180                         DeliveryQueue dq = dqs.get(spl);\r
181                         if (dq == null) {\r
182                                 dq = new DeliveryQueue(config, di);\r
183                         } else {\r
184                                 dq.config(di);\r
185                         }\r
186                         ndqs.put(spl, dq);\r
187                         nqs[qpos++] = dq;\r
188                 }\r
189                 queues = nqs;\r
190                 dqs = ndqs;\r
191                 cleardirs();\r
192                 while (curthreads < threads) {\r
193                         curthreads++;\r
194                         (new Thread() {\r
195                                 {\r
196                                         setName("Delivery Thread");\r
197                                 }\r
198                                 public void run() {\r
199                                         dodelivery();\r
200                                 }\r
201                         }).start();\r
202                 }\r
203                 nextcheck = 0;\r
204                 notify();\r
205         }\r
206         private void dodelivery() {\r
207                 DeliveryQueue dq;\r
208                 while ((dq = getNextQueue()) != null) {\r
209                         dq.run();\r
210                 }\r
211         }\r
212         private synchronized DeliveryQueue getNextQueue() {\r
213                 while (true) {\r
214                         if (curthreads > threads) {\r
215                                 curthreads--;\r
216                                 return(null);\r
217                         }\r
218                         if (qpos < queues.length) {\r
219                                 DeliveryQueue dq = queues[qpos++];\r
220                                 if (dq.isSkipSet()) {\r
221                                         continue;\r
222                                 }\r
223                                 nextcheck = 0;\r
224                                 notify();\r
225                                 return(dq);\r
226                         }\r
227                         long now = System.currentTimeMillis();\r
228                         if (now < nextcheck) {\r
229                                 try {\r
230                                         wait(nextcheck + 500 - now);\r
231                                 } catch (Exception e) {\r
232                                 }\r
233                                 now = System.currentTimeMillis();\r
234                         }\r
235                         if (now >= nextcheck) {\r
236                                 nextcheck = now + 5000;\r
237                                 qpos = 0;\r
238                                 freeDiskCheck();\r
239                         }\r
240                 }\r
241         }\r
242         /**\r
243          *      Reset the retry timer for a delivery queue\r
244          */\r
245         public synchronized void resetQueue(String spool) {\r
246                 if (spool != null) {\r
247                         DeliveryQueue dq = dqs.get(spool);\r
248                         if (dq != null) {\r
249                                 dq.resetQueue();\r
250                         }\r
251                 }\r
252         }\r
253 }\r