[DMAAP-48] Initial code import
[dmaap/datarouter.git] / datarouter-node / src / main / java / com / att / research / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************\r
2  * ============LICENSE_START==================================================\r
3  * * org.onap.dmaap\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 \r
24 \r
25 package com.att.research.datarouter.node;\r
26 \r
27 import java.io.*;\r
28 import java.util.*;\r
29 \r
30 /**\r
31  *      Mechanism for monitoring and controlling delivery of files to a destination.\r
32  *      <p>\r
33  *      The DeliveryQueue class maintains lists of DeliveryTasks for a single\r
34  *      destination (a subscription or another data router node) and assigns\r
35  *      delivery threads to try to deliver them.  It also maintains a delivery\r
36  *      status that causes it to back off on delivery attempts after a failure.\r
37  *      <p>\r
38  *      If the most recent delivery result was a failure, then no more attempts\r
39  *      will be made for a period of time.  Initially, and on the first failure\r
40  *      following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).\r
41  *      If, after this delay, additional failures occur, each failure will\r
42  *      multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a\r
43  *      maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().\r
44  *      Note that this behavior applies to the delivery queue as a whole and not\r
45  *      to individual files in the queue.  If multiple files are being\r
46  *      delivered and one fails, the delay will be started.  If a second\r
47  *      delivery fails while the delay was active, it will not change the delay\r
48  *      or change the duration of any subsequent delay.\r
49  *      If, however, it succeeds, it will cancel the delay.\r
50  *      <p>\r
51  *      The queue maintains 3 collections of files to deliver: A todo list of\r
52  *      files that will be attempted, a working set of files that are being\r
53  *      attempted, and a retry set of files that were attempted and failed.\r
54  *      Whenever the todo list is empty and needs to be refilled, a scan of the\r
55  *      spool directory is made and the file names sorted.  Any files in the working set are ignored.\r
56  *      If a DeliveryTask for the file is in the retry set, then that delivery\r
57  *      task is placed on the todo list.  Otherwise, a new DeliveryTask for the\r
58  *      file is created and placed on the todo list.\r
59  *      If, when a DeliveryTask is about to be removed from the todo list, its\r
60  *      age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead\r
61  *      marked as expired.\r
62  *      <p>\r
63  *      A delivery queue also maintains a skip flag.  This flag is true if the\r
64  *      failure timer is active or if no files are found in a directory scan.\r
65  */\r
66 public class DeliveryQueue implements Runnable, DeliveryTaskHelper      {\r
67         private DeliveryQueueHelper     dqh;\r
68         private DestInfo        di;\r
69         private Hashtable<String, DeliveryTask> working = new Hashtable<String, DeliveryTask>();\r
70         private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>();\r
71         private int     todoindex;\r
72         private boolean failed;\r
73         private long    failduration;\r
74         private long    resumetime;\r
75         File    dir;\r
76         private Vector<DeliveryTask> todo = new Vector<DeliveryTask>();\r
77         /**\r
78          *      Try to cancel a delivery task.\r
79          *      @return The length of the task in bytes or 0 if the task cannot be cancelled.\r
80          */\r
81         public synchronized long cancelTask(String pubid) {\r
82                 if (working.get(pubid) != null) {\r
83                         return(0);\r
84                 }\r
85                 DeliveryTask dt = retry.get(pubid);\r
86                 if (dt == null) {\r
87                         for (int i = todoindex; i < todo.size(); i++) {\r
88                                 DeliveryTask xdt = todo.get(i);\r
89                                 if (xdt.getPublishId().equals(pubid)) {\r
90                                         dt = xdt;\r
91                                         break;\r
92                                 }\r
93                         }\r
94                 }\r
95                 if (dt == null) {\r
96                         dt = new DeliveryTask(this, pubid);\r
97                         if (dt.getFileId() == null) {\r
98                                 return(0);\r
99                         }\r
100                 }\r
101                 if (dt.isCleaned()) {\r
102                         return(0);\r
103                 }\r
104                 StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());\r
105                 dt.clean();\r
106                 return(dt.getLength());\r
107         }\r
108         /**\r
109          *      Mark that a delivery task has succeeded.\r
110          */\r
111         public synchronized void markSuccess(DeliveryTask task) {\r
112                 working.remove(task.getPublishId());\r
113                 task.clean();\r
114                 failed = false;\r
115                 failduration = 0;\r
116         }\r
117         /**\r
118          *      Mark that a delivery task has expired.\r
119          */\r
120         public synchronized void markExpired(DeliveryTask task) {\r
121                 task.clean();\r
122         }\r
123         /**\r
124          *      Mark that a delivery task has failed permanently.\r
125          */\r
126         public synchronized void markFailNoRetry(DeliveryTask task) {\r
127                 working.remove(task.getPublishId());\r
128                 task.clean();\r
129                 failed = false;\r
130                 failduration = 0;\r
131         }\r
132         private void fdupdate() {\r
133                 if (!failed) {\r
134                         failed = true;\r
135                         if (failduration == 0) {\r
136                                 failduration = dqh.getInitFailureTimer();\r
137                         }\r
138                         resumetime = System.currentTimeMillis() + failduration;\r
139                         long maxdur = dqh.getMaxFailureTimer();\r
140                         failduration = (long)(failduration * dqh.getFailureBackoff());\r
141                         if (failduration > maxdur) {\r
142                                 failduration = maxdur;\r
143                         }\r
144                 }\r
145         }\r
146         /**\r
147          *      Mark that a delivery task has been redirected.\r
148          */\r
149         public synchronized void markRedirect(DeliveryTask task) {\r
150                 working.remove(task.getPublishId());\r
151                 retry.put(task.getPublishId(), task);\r
152         }\r
153         /**\r
154          *      Mark that a delivery task has temporarily failed.\r
155          */\r
156         public synchronized void markFailWithRetry(DeliveryTask task) {\r
157                 working.remove(task.getPublishId());\r
158                 retry.put(task.getPublishId(), task);\r
159                 fdupdate();\r
160         }\r
161         /**\r
162          *      Get the next task.\r
163          */\r
164         public synchronized DeliveryTask getNext() {\r
165                 DeliveryTask ret = peekNext();\r
166                 if (ret != null) {\r
167                         todoindex++;\r
168                         working.put(ret.getPublishId(), ret);\r
169                 }\r
170                 return(ret);\r
171         }\r
172         /**\r
173          *      Peek at the next task.\r
174          */\r
175         public synchronized DeliveryTask peekNext() {\r
176                 long now = System.currentTimeMillis();\r
177                 long mindate = now - dqh.getExpirationTimer();\r
178                 if (failed) {\r
179                         if (now > resumetime) {\r
180                                 failed = false;\r
181                         } else {\r
182                                 return(null);\r
183                         }\r
184                 }\r
185                 while (true) {\r
186                         if (todoindex >= todo.size()) {\r
187                                 todoindex = 0;\r
188                                 todo = new Vector<DeliveryTask>();\r
189                                 String[] files = dir.list();\r
190                                 Arrays.sort(files);\r
191                                 for (String fname: files) {\r
192                                         if (!fname.endsWith(".M")) {\r
193                                                 continue;\r
194                                         }\r
195                                         String fname2 = fname.substring(0, fname.length() - 2);\r
196                                         long pidtime = 0;\r
197                                         int dot = fname2.indexOf('.');\r
198                                         if (dot < 1) {\r
199                                                 continue;\r
200                                         }\r
201                                         try {\r
202                                                 pidtime = Long.parseLong(fname2.substring(0, dot));\r
203                                         } catch (Exception e) {\r
204                                         }\r
205                                         if (pidtime < 1000000000000L) {\r
206                                                 continue;\r
207                                         }\r
208                                         if (working.get(fname2) != null) {\r
209                                                 continue;\r
210                                         }\r
211                                         DeliveryTask dt = retry.get(fname2);\r
212                                         if (dt == null) {\r
213                                                 dt = new DeliveryTask(this, fname2);\r
214                                         }\r
215                                         todo.add(dt);\r
216                                 }\r
217                                 retry = new Hashtable<String, DeliveryTask>();\r
218                         }\r
219                         if (todoindex < todo.size()) {\r
220                                 DeliveryTask dt = todo.get(todoindex);\r
221                                 if (dt.isCleaned()) {\r
222                                         todoindex++;\r
223                                         continue;\r
224                                 }\r
225                                 if (dt.getDate() >= mindate) {\r
226                                         return(dt);\r
227                                 }\r
228                                 todoindex++;\r
229                                 reportExpiry(dt);\r
230                                 continue;\r
231                         }\r
232                         return(null);\r
233                 }\r
234         }\r
235         /**\r
236          *      Create a delivery queue for a given destination info\r
237          */\r
238         public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) {\r
239                 this.dqh = dqh;\r
240                 this.di = di;\r
241                 dir = new File(di.getSpool());\r
242                 dir.mkdirs();\r
243         }\r
244         /**\r
245          *      Update the destination info for this delivery queue\r
246          */\r
247         public void config(DestInfo di) {\r
248                 this.di = di;\r
249         }\r
250         /**\r
251          *      Get the dest info\r
252          */\r
253         public DestInfo getDestInfo() {\r
254                 return(di);\r
255         }\r
256         /**\r
257          *      Get the config manager\r
258          */\r
259         public DeliveryQueueHelper getConfig() {\r
260                 return(dqh);\r
261         }\r
262         /**\r
263          *      Exceptional condition occurred during delivery\r
264          */\r
265         public void reportDeliveryExtra(DeliveryTask task, long sent) {\r
266                 StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);\r
267         }\r
268         /**\r
269          *      Message too old to deliver\r
270          */\r
271         public void reportExpiry(DeliveryTask task) {\r
272                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());\r
273                 markExpired(task);\r
274         }\r
275         /**\r
276          *      Completed a delivery attempt\r
277          */\r
278         public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {\r
279                 if (status < 300) {\r
280                         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid);\r
281                         markSuccess(task);\r
282                 } else if (status < 400 && dqh.isFollowRedirects()) {\r
283                         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);\r
284                         if (dqh.handleRedirection(di, location, task.getFileId())) {\r
285                                 markRedirect(task);\r
286                         } else {\r
287                                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());\r
288                                 markFailNoRetry(task);\r
289                         }\r
290                 } else if (status < 500) {\r
291                         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);\r
292                         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());\r
293                         markFailNoRetry(task);\r
294                 } else {\r
295                         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);\r
296                         markFailWithRetry(task);\r
297                 }\r
298         }\r
299         /**\r
300          *      Delivery failed by reason of an exception\r
301          */\r
302         public void reportException(DeliveryTask task, Exception exception) {\r
303                 StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString());\r
304                 dqh.handleUnreachable(di);\r
305                 markFailWithRetry(task);\r
306         }\r
307         /**\r
308          *      Get the feed ID for a subscription\r
309          *      @param subid    The subscription ID\r
310          *      @return The feed ID\r
311          */\r
312         public String getFeedId(String subid) {\r
313                 return(dqh.getFeedId(subid));\r
314         }\r
315         /**\r
316          *      Get the URL to deliver a message to given the file ID\r
317          */\r
318         public String getDestURL(String fileid) {\r
319                 return(dqh.getDestURL(di, fileid));\r
320         }\r
321         /**\r
322          *      Deliver files until there's a failure or there are no more\r
323          *      files to deliver\r
324          */\r
325         public void run() {\r
326                 DeliveryTask t;\r
327                 long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit();\r
328                 int filestogo = dqh.getFairFileLimit();\r
329                 while ((t = getNext()) != null) {\r
330                         t.run();\r
331                         if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {\r
332                                 break;\r
333                         }\r
334                 }\r
335         }\r
336         /**\r
337          *      Is there no work to do for this queue right now?\r
338          */\r
339         public synchronized boolean isSkipSet() {\r
340                 return(peekNext() == null);\r
341         }\r
342         /**\r
343          *      Reset the retry timer\r
344          */\r
345         public void resetQueue() {\r
346                 resumetime = System.currentTimeMillis();\r
347         }\r
348 }\r