Removing code smells
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.HashMap;
33 import java.util.List;
34 import org.jetbrains.annotations.Nullable;
35
36 /**
37  * Mechanism for monitoring and controlling delivery of files to a destination.
38  *
39  * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single
40  * destination (a subscription or another data router node) and assigns
41  * delivery threads to try to deliver them.  It also maintains a delivery
42  * status that causes it to back off on delivery attempts after a failure.
43  *
44  * <p>If the most recent delivery result was a failure, then no more attempts
45  * will be made for a period of time.  Initially, and on the first failure
46  * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
47  * If, after this delay, additional failures occur, each failure will
48  * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
49  * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
50  * Note that this behavior applies to the delivery queue as a whole and not
51  * to individual files in the queue.  If multiple files are being
52  * delivered and one fails, the delay will be started.  If a second
53  * delivery fails while the delay was active, it will not change the delay
54  * or change the duration of any subsequent delay.
55  * If, however, it succeeds, it will cancel the delay.
56  *
57  * The queue maintains 3 collections of files to deliver: A todoList of
58  * files that will be attempted, a working set of files that are being
59  * attempted, and a retry set of files that were attempted and failed.
60  * Whenever the todoList is empty and needs to be refilled, a scan of the
61  * spool directory is made and the file names sorted.  Any files in the working set are ignored.
62  * If a DeliveryTask for the file is in the retry set, then that delivery
63  * task is placed on the todoList.  Otherwise, a new DeliveryTask for the
64  * file is created and placed on the todoList.
65  * If, when a DeliveryTask is about to be removed from the todoList, its
66  * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
67  * marked as expired.
68  *
69  * <p>A delivery queue also maintains a skip flag.  This flag is true if the
70  * failure timer is active or if no files are found in a directory scan.
71  */
72 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
73     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
74     private DeliveryQueueHelper deliveryQueueHelper;
75
76     private DestInfo destinationInfo;
77     private HashMap<String, DeliveryTask> working = new HashMap<>();
78     private HashMap<String, DeliveryTask> retry = new HashMap<>();
79     private int todoindex;
80     private boolean failed;
81     private long failduration;
82     private long resumetime;
83     private File dir;
84     private List<DeliveryTask> todoList = new ArrayList<>();
85
86     /**
87      * Try to cancel a delivery task.
88      *
89      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
90      */
91     synchronized long cancelTask(String pubid) {
92         if (working.get(pubid) != null) {
93             return (0);
94         }
95         DeliveryTask dt = retry.get(pubid);
96         if (dt == null) {
97             for (int i = todoindex; i < todoList.size(); i++) {
98                 DeliveryTask xdt = todoList.get(i);
99                 if (xdt.getPublishId().equals(pubid)) {
100                     dt = xdt;
101                     break;
102                 }
103             }
104         }
105         if (dt == null) {
106             dt = new DeliveryTask(this, pubid);
107             if (dt.getFileId() == null) {
108                 return (0);
109             }
110         }
111         if (dt.isCleaned()) {
112             return (0);
113         }
114         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(),
115                 dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
116         dt.clean();
117         return (dt.getLength());
118     }
119
120     /**
121      * Mark that a delivery task has succeeded.
122      */
123     private synchronized void markSuccess(DeliveryTask task) {
124         working.remove(task.getPublishId());
125         logger.info(task.getPublishId() + " marked as success.");
126         task.clean();
127         failed = false;
128         failduration = 0;
129     }
130
131     /**
132      * Mark that a delivery task has expired.
133      */
134     private synchronized void markExpired(DeliveryTask task) {
135         logger.info(task.getPublishId() + " marked as expired.");
136         task.clean();
137     }
138
139     /**
140      * Mark that a delivery task has failed permanently.
141      */
142     private synchronized void markFailNoRetry(DeliveryTask task) {
143         working.remove(task.getPublishId());
144         logger.info(task.getPublishId() + " marked as failed permanently");
145         task.clean();
146         failed = false;
147         failduration = 0;
148     }
149
150     private void fdupdate() {
151         if (!failed) {
152             failed = true;
153             if (failduration == 0) {
154                 if (destinationInfo.isPrivilegedSubscriber()) {
155                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
156                 } else {
157                     failduration = deliveryQueueHelper.getInitFailureTimer();
158                 }
159             }
160             resumetime = System.currentTimeMillis() + failduration;
161             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
162             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
163             if (failduration > maxdur) {
164                 failduration = maxdur;
165             }
166         }
167     }
168
169     /**
170      * Mark that a delivery task has been redirected.
171      */
172     private synchronized void markRedirect(DeliveryTask task) {
173         working.remove(task.getPublishId());
174         logger.info(task.getPublishId() + " marked as redirected.");
175         retry.put(task.getPublishId(), task);
176     }
177
178     /**
179      * Mark that a delivery task has temporarily failed.
180      */
181     private synchronized void markFailWithRetry(DeliveryTask task) {
182         working.remove(task.getPublishId());
183         logger.info(task.getPublishId() + " marked as temporarily failed.");
184         retry.put(task.getPublishId(), task);
185         fdupdate();
186     }
187
188     /**
189      * Get the next task.
190      */
191     synchronized DeliveryTask getNext() {
192         DeliveryTask ret = peekNext();
193         if (ret != null) {
194             todoindex++;
195             working.put(ret.getPublishId(), ret);
196         }
197         return (ret);
198     }
199
200     /**
201      * Peek at the next task.
202      */
203     synchronized DeliveryTask peekNext() {
204         long now = System.currentTimeMillis();
205         long mindate = now - deliveryQueueHelper.getExpirationTimer();
206         if (failed) {
207             if (now > resumetime) {
208                 failed = false;
209             } else {
210                 return (null);
211             }
212         }
213         while (true) {
214             if (todoindex >= todoList.size()) {
215                 todoindex = 0;
216                 todoList = new ArrayList<>();
217                 String[] files = dir.list();
218                 Arrays.sort(files);
219                 scanForNextTask(files);
220                 retry = new HashMap<>();
221             }
222             DeliveryTask dt = getDeliveryTask(mindate);
223             if (dt != null) {
224                 return dt;
225             }
226             return null;
227
228         }
229     }
230
231     /**
232      * Create a delivery queue for a given destination info.
233      */
234     DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
235         this.deliveryQueueHelper = deliveryQueueHelper;
236         this.destinationInfo = destinationInfo;
237         dir = new File(destinationInfo.getSpool());
238         dir.mkdirs();
239     }
240
241     /**
242      * Update the destination info for this delivery queue.
243      */
244     public void config(DestInfo destinationInfo) {
245         this.destinationInfo = destinationInfo;
246     }
247
248     /**
249      * Get the dest info.
250      */
251     public DestInfo getDestinationInfo() {
252         return (destinationInfo);
253     }
254
255     /**
256      * Get the config manager.
257      */
258     public DeliveryQueueHelper getConfig() {
259         return (deliveryQueueHelper);
260     }
261
262     /**
263      * Exceptional condition occurred during delivery.
264      */
265     public void reportDeliveryExtra(DeliveryTask task, long sent) {
266         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
267     }
268
269     /**
270      * Message too old to deliver.
271      */
272     void reportExpiry(DeliveryTask task) {
273         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
274                 task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
275         markExpired(task);
276     }
277
278     /**
279      * Completed a delivery attempt.
280      */
281     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
282         if (status < 300) {
283             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
284                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
285             if (destinationInfo.isPrivilegedSubscriber()) {
286                 task.setResumeTime(System.currentTimeMillis()
287                                            + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
288                 markFailWithRetry(task);
289             } else {
290                 markSuccess(task);
291             }
292         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
293             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
294                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
295             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
296                 markRedirect(task);
297             } else {
298                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(),
299                         task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
300                 markFailNoRetry(task);
301             }
302         } else if (status < 500 && status != 429) {
303             // Status 429 is the standard response for Too Many Requests and indicates
304             // that a file needs to be delivered again at a later time.
305             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
306                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
307             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
308                     task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
309             markFailNoRetry(task);
310         } else {
311             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
312                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
313             markFailWithRetry(task);
314         }
315     }
316
317     /**
318      * Delivery failed by reason of an exception.
319      */
320     public void reportException(DeliveryTask task, Exception exception) {
321         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
322                 task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
323         deliveryQueueHelper.handleUnreachable(destinationInfo);
324         markFailWithRetry(task);
325     }
326
327     /**
328      * Get the feed ID for a subscription.
329      *
330      * @param subid The subscription ID
331      * @return The feed ID
332      */
333     public String getFeedId(String subid) {
334         return (deliveryQueueHelper.getFeedId(subid));
335     }
336
337     /**
338      * Get the URL to deliver a message to given the file ID.
339      */
340     public String getDestURL(String fileid) {
341         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
342     }
343
344     /**
345      * Deliver files until there's a failure or there are no more.
346      * files to deliver
347      */
348     public void run() {
349         DeliveryTask task;
350         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
351         int filestogo = deliveryQueueHelper.getFairFileLimit();
352         while ((task = getNext()) != null) {
353             logger.info("Processing file: " + task.getPublishId());
354             task.run();
355             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
356                 break;
357             }
358         }
359     }
360
361     /**
362      * Is there no work to do for this queue right now?.
363      */
364     synchronized boolean isSkipSet() {
365         return (peekNext() == null);
366     }
367
368     /**
369      * Reset the retry timer.
370      */
371     void resetQueue() {
372         resumetime = System.currentTimeMillis();
373     }
374
375     /**
376      * Get task if in queue and mark as success.
377      */
378     boolean markTaskSuccess(String pubId) {
379         DeliveryTask task = working.get(pubId);
380         if (task != null) {
381             markSuccess(task);
382             return true;
383         }
384         task = retry.get(pubId);
385         if (task != null) {
386             retry.remove(pubId);
387             task.clean();
388             resetQueue();
389             failduration = 0;
390             return true;
391         }
392         return false;
393     }
394
395     private void scanForNextTask(String[] files) {
396         for (String fname : files) {
397             String pubId = getPubId(fname);
398             if (pubId == null) {
399                 continue;
400             }
401             DeliveryTask dt = retry.get(pubId);
402             if (dt == null) {
403                 dt = new DeliveryTask(this, pubId);
404             }
405             todoList.add(dt);
406         }
407     }
408
409     @Nullable
410     private DeliveryTask getDeliveryTask(long mindate) {
411         if (todoindex < todoList.size()) {
412             DeliveryTask dt = todoList.get(todoindex);
413             if (dt.isCleaned()) {
414                 todoindex++;
415             }
416             if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
417                 retry.put(dt.getPublishId(), dt);
418                 todoindex++;
419             }
420             if (dt.getDate() >= mindate) {
421                 return (dt);
422             }
423             todoindex++;
424             reportExpiry(dt);
425         }
426         return null;
427     }
428
429     @Nullable
430     private String getPubId(String fname) {
431         if (!fname.endsWith(".M")) {
432             return null;
433         }
434         String fname2 = fname.substring(0, fname.length() - 2);
435         long pidtime = 0;
436         int dot = fname2.indexOf('.');
437         if (dot < 1) {
438             return null;
439         }
440         try {
441             pidtime = Long.parseLong(fname2.substring(0, dot));
442         } catch (Exception e) {
443             logger.error("Exception", e);
444         }
445         if (pidtime < 1000000000000L) {
446             return null;
447         }
448         if (working.get(fname2) != null) {
449             return null;
450         }
451         return fname2;
452     }
453 }