b7699e53b4393e9dcb24dd59953566b6f928e10f
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.util.Arrays;
31 import java.util.Hashtable;
32 import java.util.Vector;
33 import org.jetbrains.annotations.Nullable;
34
35 /**
36  * Mechanism for monitoring and controlling delivery of files to a destination.
37  *
38  * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single
39  * destination (a subscription or another data router node) and assigns
40  * delivery threads to try to deliver them.  It also maintains a delivery
41  * status that causes it to back off on delivery attempts after a failure.
42  *
43  * <p>If the most recent delivery result was a failure, then no more attempts
44  * will be made for a period of time.  Initially, and on the first failure
45  * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
46  * If, after this delay, additional failures occur, each failure will
47  * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
48  * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
49  * Note that this behavior applies to the delivery queue as a whole and not
50  * to individual files in the queue.  If multiple files are being
51  * delivered and one fails, the delay will be started.  If a second
52  * delivery fails while the delay was active, it will not change the delay
53  * or change the duration of any subsequent delay.
54  * If, however, it succeeds, it will cancel the delay.
55  *
56  * <p>The queue maintains 3 collections of files to deliver: A todo list of
57  * files that will be attempted, a working set of files that are being
58  * attempted, and a retry set of files that were attempted and failed.
59  * Whenever the todo list is empty and needs to be refilled, a scan of the
60  * spool directory is made and the file names sorted.  Any files in the working set are ignored.
61  * If a DeliveryTask for the file is in the retry set, then that delivery
62  * task is placed on the todo list.  Otherwise, a new DeliveryTask for the
63  * file is created and placed on the todo list.
64  * If, when a DeliveryTask is about to be removed from the todo list, its
65  * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
66  * marked as expired.
67  *
68  * <p>A delivery queue also maintains a skip flag.  This flag is true if the
69  * failure timer is active or if no files are found in a directory scan.
70  */
71 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
72     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
73     private DeliveryQueueHelper deliveryQueueHelper;
74
75     private DestInfo destinationInfo;
76     private Hashtable<String, DeliveryTask> working = new Hashtable<>();
77     private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
78     private int todoindex;
79     private boolean failed;
80     private long failduration;
81     private long resumetime;
82     private File dir;
83     private Vector<DeliveryTask> todo = new Vector<>();
84
85     /**
86      * Try to cancel a delivery task.
87      *
88      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
89      */
90     synchronized long cancelTask(String pubid) {
91         if (working.get(pubid) != null) {
92             return (0);
93         }
94         DeliveryTask dt = retry.get(pubid);
95         if (dt == null) {
96             for (int i = todoindex; i < todo.size(); i++) {
97                 DeliveryTask xdt = todo.get(i);
98                 if (xdt.getPublishId().equals(pubid)) {
99                     dt = xdt;
100                     break;
101                 }
102             }
103         }
104         if (dt == null) {
105             dt = new DeliveryTask(this, pubid);
106             if (dt.getFileId() == null) {
107                 return (0);
108             }
109         }
110         if (dt.isCleaned()) {
111             return (0);
112         }
113         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(),
114                 dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
115         dt.clean();
116         return (dt.getLength());
117     }
118
119     /**
120      * Mark that a delivery task has succeeded.
121      */
122     private synchronized void markSuccess(DeliveryTask task) {
123         working.remove(task.getPublishId());
124         logger.info(task.getPublishId() + " marked as success.");
125         task.clean();
126         failed = false;
127         failduration = 0;
128     }
129
130     /**
131      * Mark that a delivery task has expired.
132      */
133     private synchronized void markExpired(DeliveryTask task) {
134         logger.info(task.getPublishId() + " marked as expired.");
135         task.clean();
136     }
137
138     /**
139      * Mark that a delivery task has failed permanently.
140      */
141     private synchronized void markFailNoRetry(DeliveryTask task) {
142         working.remove(task.getPublishId());
143         logger.info(task.getPublishId() + " marked as failed permanently");
144         task.clean();
145         failed = false;
146         failduration = 0;
147     }
148
149     private void fdupdate() {
150         if (!failed) {
151             failed = true;
152             if (failduration == 0) {
153                 if (destinationInfo.isPrivilegedSubscriber()) {
154                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
155                 } else {
156                     failduration = deliveryQueueHelper.getInitFailureTimer();
157                 }
158             }
159             resumetime = System.currentTimeMillis() + failduration;
160             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
161             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
162             if (failduration > maxdur) {
163                 failduration = maxdur;
164             }
165         }
166     }
167
168     /**
169      * Mark that a delivery task has been redirected.
170      */
171     private synchronized void markRedirect(DeliveryTask task) {
172         working.remove(task.getPublishId());
173         logger.info(task.getPublishId() + " marked as redirected.");
174         retry.put(task.getPublishId(), task);
175     }
176
177     /**
178      * Mark that a delivery task has temporarily failed.
179      */
180     private synchronized void markFailWithRetry(DeliveryTask task) {
181         working.remove(task.getPublishId());
182         logger.info(task.getPublishId() + " marked as temporarily failed.");
183         retry.put(task.getPublishId(), task);
184         fdupdate();
185     }
186
187     /**
188      * Get the next task.
189      */
190     synchronized DeliveryTask getNext() {
191         DeliveryTask ret = peekNext();
192         if (ret != null) {
193             todoindex++;
194             working.put(ret.getPublishId(), ret);
195         }
196         return (ret);
197     }
198
199     /**
200      * Peek at the next task.
201      */
202     synchronized DeliveryTask peekNext() {
203         long now = System.currentTimeMillis();
204         long mindate = now - deliveryQueueHelper.getExpirationTimer();
205         if (failed) {
206             if (now > resumetime) {
207                 failed = false;
208             } else {
209                 return (null);
210             }
211         }
212         while (true) {
213             if (todoindex >= todo.size()) {
214                 todoindex = 0;
215                 todo = new Vector<>();
216                 String[] files = dir.list();
217                 Arrays.sort(files);
218                 scanForNextTask(files);
219                 retry = new Hashtable<>();
220             }
221             DeliveryTask dt = getDeliveryTask(mindate);
222             if (dt != null) {
223                 return dt;
224             }
225             return null;
226
227         }
228     }
229
230     /**
231      * Create a delivery queue for a given destination info.
232      */
233     DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
234         this.deliveryQueueHelper = deliveryQueueHelper;
235         this.destinationInfo = destinationInfo;
236         dir = new File(destinationInfo.getSpool());
237         dir.mkdirs();
238     }
239
240     /**
241      * Update the destination info for this delivery queue.
242      */
243     public void config(DestInfo destinationInfo) {
244         this.destinationInfo = destinationInfo;
245     }
246
247     /**
248      * Get the dest info.
249      */
250     public DestInfo getDestinationInfo() {
251         return (destinationInfo);
252     }
253
254     /**
255      * Get the config manager.
256      */
257     public DeliveryQueueHelper getConfig() {
258         return (deliveryQueueHelper);
259     }
260
261     /**
262      * Exceptional condition occurred during delivery.
263      */
264     public void reportDeliveryExtra(DeliveryTask task, long sent) {
265         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
266     }
267
268     /**
269      * Message too old to deliver.
270      */
271     void reportExpiry(DeliveryTask task) {
272         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
273                 task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
274         markExpired(task);
275     }
276
277     /**
278      * Completed a delivery attempt.
279      */
280     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
281         if (status < 300) {
282             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
283                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
284             if (destinationInfo.isPrivilegedSubscriber()) {
285                 task.setResumeTime(System.currentTimeMillis()
286                                            + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
287                 markFailWithRetry(task);
288             } else {
289                 markSuccess(task);
290             }
291         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
292             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
293                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
294             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
295                 markRedirect(task);
296             } else {
297                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(),
298                         task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
299                 markFailNoRetry(task);
300             }
301         } else if (status < 500 && status != 429) {
302             // Status 429 is the standard response for Too Many Requests and indicates
303             // that a file needs to be delivered again at a later time.
304             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
305                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
306             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
307                     task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
308             markFailNoRetry(task);
309         } else {
310             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
311                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
312             markFailWithRetry(task);
313         }
314     }
315
316     /**
317      * Delivery failed by reason of an exception.
318      */
319     public void reportException(DeliveryTask task, Exception exception) {
320         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
321                 task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
322         deliveryQueueHelper.handleUnreachable(destinationInfo);
323         markFailWithRetry(task);
324     }
325
326     /**
327      * Get the feed ID for a subscription.
328      *
329      * @param subid The subscription ID
330      * @return The feed ID
331      */
332     public String getFeedId(String subid) {
333         return (deliveryQueueHelper.getFeedId(subid));
334     }
335
336     /**
337      * Get the URL to deliver a message to given the file ID.
338      */
339     public String getDestURL(String fileid) {
340         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
341     }
342
343     /**
344      * Deliver files until there's a failure or there are no more.
345      * files to deliver
346      */
347     public void run() {
348         DeliveryTask task;
349         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
350         int filestogo = deliveryQueueHelper.getFairFileLimit();
351         while ((task = getNext()) != null) {
352             logger.info("Processing file: " + task.getPublishId());
353             task.run();
354             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
355                 break;
356             }
357         }
358     }
359
360     /**
361      * Is there no work to do for this queue right now?.
362      */
363     synchronized boolean isSkipSet() {
364         return (peekNext() == null);
365     }
366
367     /**
368      * Reset the retry timer.
369      */
370     void resetQueue() {
371         resumetime = System.currentTimeMillis();
372     }
373
374     /**
375      * Get task if in queue and mark as success.
376      */
377     boolean markTaskSuccess(String pubId) {
378         DeliveryTask task = working.get(pubId);
379         if (task != null) {
380             markSuccess(task);
381             return true;
382         }
383         task = retry.get(pubId);
384         if (task != null) {
385             retry.remove(pubId);
386             task.clean();
387             resetQueue();
388             failduration = 0;
389             return true;
390         }
391         return false;
392     }
393
394     private void scanForNextTask(String[] files) {
395         for (String fname : files) {
396             String pubId = getPubId(fname);
397             if (pubId == null) {
398                 continue;
399             }
400             DeliveryTask dt = retry.get(pubId);
401             if (dt == null) {
402                 dt = new DeliveryTask(this, pubId);
403             }
404             todo.add(dt);
405         }
406     }
407
408     @Nullable
409     private DeliveryTask getDeliveryTask(long mindate) {
410         if (todoindex < todo.size()) {
411             DeliveryTask dt = todo.get(todoindex);
412             if (dt.isCleaned()) {
413                 todoindex++;
414             }
415             if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
416                 retry.put(dt.getPublishId(), dt);
417                 todoindex++;
418             }
419             if (dt.getDate() >= mindate) {
420                 return (dt);
421             }
422             todoindex++;
423             reportExpiry(dt);
424         }
425         return null;
426     }
427
428     @Nullable
429     private String getPubId(String fname) {
430         if (!fname.endsWith(".M")) {
431             return null;
432         }
433         String fname2 = fname.substring(0, fname.length() - 2);
434         long pidtime = 0;
435         int dot = fname2.indexOf('.');
436         if (dot < 1) {
437             return null;
438         }
439         try {
440             pidtime = Long.parseLong(fname2.substring(0, dot));
441         } catch (Exception e) {
442             logger.error("Exception", e);
443         }
444         if (pidtime < 1000000000000L) {
445             return null;
446         }
447         if (working.get(fname2) != null) {
448             return null;
449         }
450         return fname2;
451     }
452 }