Remove datarouter-node critical code smells
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.*;
30 import java.util.*;
31
32 /**
33  * Mechanism for monitoring and controlling delivery of files to a destination.
34  * <p>
35  * The DeliveryQueue class maintains lists of DeliveryTasks for a single
36  * destination (a subscription or another data router node) and assigns
37  * delivery threads to try to deliver them.  It also maintains a delivery
38  * status that causes it to back off on delivery attempts after a failure.
39  * <p>
40  * If the most recent delivery result was a failure, then no more attempts
41  * will be made for a period of time.  Initially, and on the first failure
42  * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
43  * If, after this delay, additional failures occur, each failure will
44  * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
45  * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
46  * Note that this behavior applies to the delivery queue as a whole and not
47  * to individual files in the queue.  If multiple files are being
48  * delivered and one fails, the delay will be started.  If a second
49  * delivery fails while the delay was active, it will not change the delay
50  * or change the duration of any subsequent delay.
51  * If, however, it succeeds, it will cancel the delay.
52  * <p>
53  * The queue maintains 3 collections of files to deliver: A todo list of
54  * files that will be attempted, a working set of files that are being
55  * attempted, and a retry set of files that were attempted and failed.
56  * Whenever the todo list is empty and needs to be refilled, a scan of the
57  * spool directory is made and the file names sorted.  Any files in the working set are ignored.
58  * If a DeliveryTask for the file is in the retry set, then that delivery
59  * task is placed on the todo list.  Otherwise, a new DeliveryTask for the
60  * file is created and placed on the todo list.
61  * If, when a DeliveryTask is about to be removed from the todo list, its
62  * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
63  * marked as expired.
64  * <p>
65  * A delivery queue also maintains a skip flag.  This flag is true if the
66  * failure timer is active or if no files are found in a directory scan.
67  */
68 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
69     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
70     private DeliveryQueueHelper deliveryQueueHelper;
71     private DestInfo destinationInfo;
72     private Hashtable<String, DeliveryTask> working = new Hashtable<>();
73     private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
74     private int todoindex;
75     private boolean failed;
76     private long failduration;
77     private long resumetime;
78     private File dir;
79     private Vector<DeliveryTask> todo = new Vector<>();
80
81     /**
82      * Try to cancel a delivery task.
83      *
84      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
85      */
86     synchronized long cancelTask(String pubid) {
87         if (working.get(pubid) != null) {
88             return (0);
89         }
90         DeliveryTask dt = retry.get(pubid);
91         if (dt == null) {
92             for (int i = todoindex; i < todo.size(); i++) {
93                 DeliveryTask xdt = todo.get(i);
94                 if (xdt.getPublishId().equals(pubid)) {
95                     dt = xdt;
96                     break;
97                 }
98             }
99         }
100         if (dt == null) {
101             dt = new DeliveryTask(this, pubid);
102             if (dt.getFileId() == null) {
103                 return (0);
104             }
105         }
106         if (dt.isCleaned()) {
107             return (0);
108         }
109         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
110         dt.clean();
111         return (dt.getLength());
112     }
113
114     /**
115      * Mark that a delivery task has succeeded.
116      */
117     private synchronized void markSuccess(DeliveryTask task) {
118         working.remove(task.getPublishId());
119         task.clean();
120         failed = false;
121         failduration = 0;
122     }
123
124     /**
125      * Mark that a delivery task has expired.
126      */
127     private synchronized void markExpired(DeliveryTask task) {
128         task.clean();
129     }
130
131     /**
132      * Mark that a delivery task has failed permanently.
133      */
134     private synchronized void markFailNoRetry(DeliveryTask task) {
135         working.remove(task.getPublishId());
136         task.clean();
137         failed = false;
138         failduration = 0;
139     }
140
141     private void fdupdate() {
142         if (!failed) {
143             failed = true;
144             if (failduration == 0) {
145                 if (destinationInfo.isPrivilegedSubscriber()) {
146                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
147                 } else{
148                     failduration = deliveryQueueHelper.getInitFailureTimer();
149                 }
150             }
151             resumetime = System.currentTimeMillis() + failduration;
152             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
153             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
154             if (failduration > maxdur) {
155                 failduration = maxdur;
156             }
157         }
158     }
159
160     /**
161      * Mark that a delivery task has been redirected.
162      */
163     private synchronized void markRedirect(DeliveryTask task) {
164         working.remove(task.getPublishId());
165         retry.put(task.getPublishId(), task);
166     }
167
168     /**
169      * Mark that a delivery task has temporarily failed.
170      */
171     private synchronized void markFailWithRetry(DeliveryTask task) {
172         working.remove(task.getPublishId());
173         retry.put(task.getPublishId(), task);
174         fdupdate();
175     }
176
177     /**
178      * Get the next task.
179      */
180     synchronized DeliveryTask getNext() {
181         DeliveryTask ret = peekNext();
182         if (ret != null) {
183             todoindex++;
184             working.put(ret.getPublishId(), ret);
185         }
186         return (ret);
187     }
188
189     /**
190      * Peek at the next task.
191      */
192     synchronized DeliveryTask peekNext() {
193         long now = System.currentTimeMillis();
194         long mindate = now - deliveryQueueHelper.getExpirationTimer();
195         if (failed) {
196             if (now > resumetime) {
197                 failed = false;
198             } else {
199                 return (null);
200             }
201         }
202         while (true) {
203             if (todoindex >= todo.size()) {
204                 todoindex = 0;
205                 todo = new Vector<>();
206                 String[] files = dir.list();
207                 Arrays.sort(files);
208                 for (String fname : files) {
209                     if (!fname.endsWith(".M")) {
210                         continue;
211                     }
212                     String fname2 = fname.substring(0, fname.length() - 2);
213                     long pidtime = 0;
214                     int dot = fname2.indexOf('.');
215                     if (dot < 1) {
216                         continue;
217                     }
218                     try {
219                         pidtime = Long.parseLong(fname2.substring(0, dot));
220                     } catch (Exception e) {
221                         logger.error("Exception", e);
222                     }
223                     if (pidtime < 1000000000000L) {
224                         continue;
225                     }
226                     if (working.get(fname2) != null) {
227                         continue;
228                     }
229                     DeliveryTask dt = retry.get(fname2);
230                     if (dt == null) {
231                         dt = new DeliveryTask(this, fname2);
232                     }
233                     todo.add(dt);
234                 }
235                 retry = new Hashtable<>();
236             }
237             if (todoindex < todo.size()) {
238                 DeliveryTask dt = todo.get(todoindex);
239                 if (dt.isCleaned()) {
240                     todoindex++;
241                     continue;
242                 }
243                 if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
244                     retry.put(dt.getPublishId(), dt);
245                     todoindex++;
246                     continue;
247                 }
248                 if (dt.getDate() >= mindate) {
249                     return (dt);
250                 }
251                 todoindex++;
252                 reportExpiry(dt);
253                 continue;
254             }
255             return (null);
256         }
257     }
258
259     /**
260      * Create a delivery queue for a given destination info
261      */
262     DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
263         this.deliveryQueueHelper = deliveryQueueHelper;
264         this.destinationInfo = destinationInfo;
265         dir = new File(destinationInfo.getSpool());
266         dir.mkdirs();
267     }
268
269     /**
270      * Update the destination info for this delivery queue
271      */
272     public void config(DestInfo destinationInfo) {
273         this.destinationInfo = destinationInfo;
274     }
275
276     /**
277      * Get the dest info
278      */
279     public DestInfo getDestinationInfo() {
280         return (destinationInfo);
281     }
282
283     /**
284      * Get the config manager
285      */
286     public DeliveryQueueHelper getConfig() {
287         return (deliveryQueueHelper);
288     }
289
290     /**
291      * Exceptional condition occurred during delivery
292      */
293     public void reportDeliveryExtra(DeliveryTask task, long sent) {
294         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
295     }
296
297     /**
298      * Message too old to deliver
299      */
300     void reportExpiry(DeliveryTask task) {
301         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
302         markExpired(task);
303     }
304
305     /**
306      * Completed a delivery attempt
307      */
308     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
309         if (status < 300) {
310             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
311             if (destinationInfo.isPrivilegedSubscriber()) {
312                 task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
313                 markFailWithRetry(task);
314             } else {
315                 markSuccess(task);
316             }
317         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
318             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
319             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
320                 markRedirect(task);
321             } else {
322                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
323                 markFailNoRetry(task);
324             }
325         } else if (status < 500 && status != 429) {         // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
326             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
327             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
328             markFailNoRetry(task);
329         } else {
330             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
331             markFailWithRetry(task);
332         }
333     }
334
335     /**
336      * Delivery failed by reason of an exception
337      */
338     public void reportException(DeliveryTask task, Exception exception) {
339         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
340         deliveryQueueHelper.handleUnreachable(destinationInfo);
341         markFailWithRetry(task);
342     }
343
344     /**
345      * Get the feed ID for a subscription
346      *
347      * @param subid The subscription ID
348      * @return The feed ID
349      */
350     public String getFeedId(String subid) {
351         return (deliveryQueueHelper.getFeedId(subid));
352     }
353
354     /**
355      * Get the URL to deliver a message to given the file ID
356      */
357     public String getDestURL(String fileid) {
358         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
359     }
360
361     /**
362      * Deliver files until there's a failure or there are no more
363      * files to deliver
364      */
365     public void run() {
366         DeliveryTask t;
367         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
368         int filestogo = deliveryQueueHelper.getFairFileLimit();
369         while ((t = getNext()) != null) {
370             t.run();
371             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
372                 break;
373             }
374         }
375     }
376
377     /**
378      * Is there no work to do for this queue right now?
379      */
380     synchronized boolean isSkipSet() {
381         return (peekNext() == null);
382     }
383
384     /**
385      * Reset the retry timer
386      */
387     void resetQueue() {
388         resumetime = System.currentTimeMillis();
389     }
390
391     /**
392      * Get task if in queue and mark as success
393      */
394     boolean markTaskSuccess(String pubId) {
395         DeliveryTask task = working.get(pubId);
396         if (task != null) {
397             markSuccess(task);
398             return true;
399         }
400         task = retry.get(pubId);
401         if (task != null) {
402             retry.remove(pubId);
403             task.clean();
404             resetQueue();
405             failduration = 0;
406             return true;
407         }
408         return false;
409     }
410 }