More bug fix and refactoring
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.HashMap;
33 import java.util.List;
34 import org.jetbrains.annotations.Nullable;
35
36 /**
37  * Mechanism for monitoring and controlling delivery of files to a destination.
38  *
39  * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single
40  * destination (a subscription or another data router node) and assigns
41  * delivery threads to try to deliver them.  It also maintains a delivery
42  * status that causes it to back off on delivery attempts after a failure.
43  *
44  * <p>If the most recent delivery result was a failure, then no more attempts
45  * will be made for a period of time.  Initially, and on the first failure
46  * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
47  * If, after this delay, additional failures occur, each failure will
48  * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
49  * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
50  * Note that this behavior applies to the delivery queue as a whole and not
51  * to individual files in the queue.  If multiple files are being
52  * delivered and one fails, the delay will be started.  If a second
53  * delivery fails while the delay was active, it will not change the delay
54  * or change the duration of any subsequent delay.
55  * If, however, it succeeds, it will cancel the delay.
56  * The queue maintains 3 collections of files to deliver: A todoList of
57  * files that will be attempted, a working set of files that are being
58  * attempted, and a retry set of files that were attempted and failed.
59  * Whenever the todoList is empty and needs to be refilled, a scan of the
60  * spool directory is made and the file names sorted.  Any files in the working set are ignored.
61  * If a DeliveryTask for the file is in the retry set, then that delivery
62  * task is placed on the todoList.  Otherwise, a new DeliveryTask for the
63  * file is created and placed on the todoList.
64  * If, when a DeliveryTask is about to be removed from the todoList, its
65  * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
66  * marked as expired.
67  *
68  * <p>A delivery queue also maintains a skip flag.  This flag is true if the
69  * failure timer is active or if no files are found in a directory scan.
70  */
71 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
72     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
73     private DeliveryQueueHelper deliveryQueueHelper;
74
75     private DestInfo destinationInfo;
76     private HashMap<String, DeliveryTask> working = new HashMap<>();
77     private HashMap<String, DeliveryTask> retry = new HashMap<>();
78     private int todoindex;
79     private boolean failed;
80     private long failduration;
81     private long resumetime;
82     private File dir;
83     private List<DeliveryTask> todoList = new ArrayList<>();
84
85     /**
86      * Create a delivery queue for a given destination info.
87      */
88     DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
89         this.deliveryQueueHelper = deliveryQueueHelper;
90         this.destinationInfo = destinationInfo;
91         dir = new File(destinationInfo.getSpool());
92         dir.mkdirs();
93     }
94
95     /**
96      * Try to cancel a delivery task.
97      *
98      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
99      */
100     synchronized long cancelTask(String pubid) {
101         if (working.get(pubid) != null) {
102             return (0);
103         }
104         DeliveryTask dt = retry.get(pubid);
105         if (dt == null) {
106             for (int i = todoindex; i < todoList.size(); i++) {
107                 DeliveryTask xdt = todoList.get(i);
108                 if (xdt.getPublishId().equals(pubid)) {
109                     dt = xdt;
110                     break;
111                 }
112             }
113         }
114         if (dt == null) {
115             dt = new DeliveryTask(this, pubid);
116             if (dt.getFileId() == null) {
117                 return (0);
118             }
119         }
120         if (dt.isCleaned()) {
121             return (0);
122         }
123         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(),
124                 dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
125         dt.clean();
126         return (dt.getLength());
127     }
128
129     /**
130      * Mark that a delivery task has succeeded.
131      */
132     private synchronized void markSuccess(DeliveryTask task) {
133         working.remove(task.getPublishId());
134         logger.info(task.getPublishId() + " marked as success.");
135         task.clean();
136         failed = false;
137         failduration = 0;
138     }
139
140     /**
141      * Mark that a delivery task has expired.
142      */
143     private synchronized void markExpired(DeliveryTask task) {
144         logger.info(task.getPublishId() + " marked as expired.");
145         task.clean();
146     }
147
148     /**
149      * Mark that a delivery task has failed permanently.
150      */
151     private synchronized void markFailNoRetry(DeliveryTask task) {
152         working.remove(task.getPublishId());
153         logger.info(task.getPublishId() + " marked as failed permanently");
154         task.clean();
155         failed = false;
156         failduration = 0;
157     }
158
159     private void fdupdate() {
160         if (!failed) {
161             failed = true;
162             if (failduration == 0) {
163                 if (destinationInfo.isPrivilegedSubscriber()) {
164                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
165                 } else {
166                     failduration = deliveryQueueHelper.getInitFailureTimer();
167                 }
168             }
169             resumetime = System.currentTimeMillis() + failduration;
170             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
171             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
172             if (failduration > maxdur) {
173                 failduration = maxdur;
174             }
175         }
176     }
177
178     /**
179      * Mark that a delivery task has been redirected.
180      */
181     private synchronized void markRedirect(DeliveryTask task) {
182         working.remove(task.getPublishId());
183         logger.info(task.getPublishId() + " marked as redirected.");
184         retry.put(task.getPublishId(), task);
185     }
186
187     /**
188      * Mark that a delivery task has temporarily failed.
189      */
190     private synchronized void markFailWithRetry(DeliveryTask task) {
191         working.remove(task.getPublishId());
192         logger.info(task.getPublishId() + " marked as temporarily failed.");
193         retry.put(task.getPublishId(), task);
194         fdupdate();
195     }
196
197     /**
198      * Get the next task.
199      */
200     synchronized DeliveryTask getNext() {
201         DeliveryTask ret = peekNext();
202         if (ret != null) {
203             todoindex++;
204             working.put(ret.getPublishId(), ret);
205         }
206         return (ret);
207     }
208
209     /**
210      * Peek at the next task.
211      */
212     synchronized DeliveryTask peekNext() {
213         long now = System.currentTimeMillis();
214         long mindate = now - deliveryQueueHelper.getExpirationTimer();
215         if (failed) {
216             if (now > resumetime) {
217                 failed = false;
218             } else {
219                 return (null);
220             }
221         }
222         while (true) {
223             if (todoindex >= todoList.size()) {
224                 todoindex = 0;
225                 todoList = new ArrayList<>();
226                 String[] files = dir.list();
227                 if (files != null) {
228                     Arrays.sort(files);
229                     scanForNextTask(files);
230                 }
231                 retry = new HashMap<>();
232             }
233             return getDeliveryTask(mindate);
234         }
235     }
236
237     /**
238      * Update the destination info for this delivery queue.
239      */
240     public void config(DestInfo destinationInfo) {
241         this.destinationInfo = destinationInfo;
242     }
243
244     /**
245      * Get the dest info.
246      */
247     public DestInfo getDestinationInfo() {
248         return (destinationInfo);
249     }
250
251     /**
252      * Get the config manager.
253      */
254     public DeliveryQueueHelper getConfig() {
255         return (deliveryQueueHelper);
256     }
257
258     /**
259      * Exceptional condition occurred during delivery.
260      */
261     public void reportDeliveryExtra(DeliveryTask task, long sent) {
262         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
263     }
264
265     /**
266      * Message too old to deliver.
267      */
268     void reportExpiry(DeliveryTask task) {
269         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
270                 task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
271         markExpired(task);
272     }
273
274     /**
275      * Completed a delivery attempt.
276      */
277     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
278         if (status < 300) {
279             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
280                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
281             if (destinationInfo.isPrivilegedSubscriber()) {
282                 task.setResumeTime(System.currentTimeMillis()
283                                            + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
284                 markFailWithRetry(task);
285             } else {
286                 markSuccess(task);
287             }
288         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
289             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
290                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
291             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
292                 markRedirect(task);
293             } else {
294                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(),
295                         task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
296                 markFailNoRetry(task);
297             }
298         } else if (status < 500 && status != 429) {
299             // Status 429 is the standard response for Too Many Requests and indicates
300             // that a file needs to be delivered again at a later time.
301             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
302                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
303             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
304                     task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
305             markFailNoRetry(task);
306         } else {
307             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
308                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
309             markFailWithRetry(task);
310         }
311     }
312
313     /**
314      * Delivery failed by reason of an exception.
315      */
316     public void reportException(DeliveryTask task, Exception exception) {
317         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
318                 task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
319         deliveryQueueHelper.handleUnreachable(destinationInfo);
320         markFailWithRetry(task);
321     }
322
323     /**
324      * Get the feed ID for a subscription.
325      *
326      * @param subid The subscription ID
327      * @return The feed ID
328      */
329     public String getFeedId(String subid) {
330         return (deliveryQueueHelper.getFeedId(subid));
331     }
332
333     /**
334      * Get the URL to deliver a message to given the file ID.
335      */
336     public String getDestURL(String fileid) {
337         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
338     }
339
340     /**
341      * Deliver files until there's a failure or there are no more.
342      * files to deliver
343      */
344     public void run() {
345         DeliveryTask task;
346         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
347         int filestogo = deliveryQueueHelper.getFairFileLimit();
348         while ((task = getNext()) != null) {
349             logger.info("Processing file: " + task.getPublishId());
350             task.run();
351             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
352                 break;
353             }
354         }
355     }
356
357     /**
358      * Is there no work to do for this queue right now?.
359      */
360     synchronized boolean isSkipSet() {
361         return (peekNext() == null);
362     }
363
364     /**
365      * Reset the retry timer.
366      */
367     void resetQueue() {
368         resumetime = System.currentTimeMillis();
369     }
370
371     /**
372      * Get task if in queue and mark as success.
373      */
374     boolean markTaskSuccess(String pubId) {
375         DeliveryTask task = working.get(pubId);
376         if (task != null) {
377             markSuccess(task);
378             return true;
379         }
380         task = retry.get(pubId);
381         if (task != null) {
382             retry.remove(pubId);
383             task.clean();
384             resetQueue();
385             failduration = 0;
386             return true;
387         }
388         return false;
389     }
390
391     private void scanForNextTask(String[] files) {
392         for (String fname : files) {
393             String pubId = getPubId(fname);
394             if (pubId == null) {
395                 continue;
396             }
397             DeliveryTask dt = retry.get(pubId);
398             if (dt == null) {
399                 dt = new DeliveryTask(this, pubId);
400             }
401             todoList.add(dt);
402         }
403     }
404
405     @Nullable
406     private DeliveryTask getDeliveryTask(long mindate) {
407         if (todoindex < todoList.size()) {
408             DeliveryTask dt = todoList.get(todoindex);
409             if (dt.isCleaned()) {
410                 todoindex++;
411             }
412             if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
413                 retry.put(dt.getPublishId(), dt);
414                 todoindex++;
415             }
416             if (dt.getDate() >= mindate) {
417                 return (dt);
418             }
419             todoindex++;
420             reportExpiry(dt);
421         }
422         return null;
423     }
424
425     @Nullable
426     private String getPubId(String fname) {
427         if (!fname.endsWith(".M")) {
428             return null;
429         }
430         String fname2 = fname.substring(0, fname.length() - 2);
431         long pidtime = 0;
432         int dot = fname2.indexOf('.');
433         if (dot < 1) {
434             return null;
435         }
436         try {
437             pidtime = Long.parseLong(fname2.substring(0, dot));
438         } catch (Exception e) {
439             logger.error("Exception", e);
440         }
441         if (pidtime < 1000000000000L) {
442             return null;
443         }
444         if (working.get(fname2) != null) {
445             return null;
446         }
447         return fname2;
448     }
449 }